returns 动态数据类型的 Apache Spark UDF

Apache Spark UDF that returns dynamic data types

我有 UDF 每行处理 JSON 和 returns 动态数据结果。在我的例子中,我需要它来验证数据和 return 验证数据。

架构对于每一行都是灵活的。这意味着我无法为每个案例创建 case class(我的一些数据可以嵌套)。

我尝试从我的 UDF 函数中 return 元组,但我也没有成功(因为我需要从列表转换为元组),而且我没有找到一个优雅的解决方案为此。

我正在 return 的数据类型是 StringIntegerDoubleDateTime,顺序不同。

我尝试在 DataFrame 上使用 map,但我的架构有问题。

import spark.implicits._

def processData(row_type: String) = {
  /*
  completely random output here. Tuple/List/Array of 
  elements with a type Integer, String, Double, DateType.
  */

  // pseudo-code starts here

  if row_type == A
     (1, "second", 3)
  else
     (1, "second", 3, 4)
}

val processDataUDF = udf((row_type: String) => processData(row_type))

val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()

结果

+------------+
|      UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+

我该如何解决这个问题?每个 row_type 我们有不同的处理结果。所有 row_type 都是动态设置的。我可以为每个 row_type 很好地 Schema,但我不能用不同的模式生成相同的 UDF return 结果。

这里使用 map 是唯一的方法吗?

Spark Dataset 是一个列式数据结构,这里真的没有地方放灵活的模式。架构必须是同类的(所有行必须具有相同的一般结构)并且预先已知(如果您使用 UDF,它必须 return 明确定义 SQL 类型)。

您可以通过以下方式实现一些灵活性:

  • 正在定义表示所有可能字段的超集的架构,并将各个列标记为 nullable。这只有在没有类型冲突时才有可能(如果 Row 包含字段 foo,它总是使用相同的 SQL 类型表示)。
  • 使用集合类型(MapTypeArrayType)来表示可变大小的字段。所有值和/或键必须是同一类型。
  • 将原始数据重塑到可以用固定模式实际表示的程度。 Spark 包含 json4s, which provides a set of tools for merging, diffing and querying JSON 数据作为其依赖项。如果需要,它可用于应用相对复杂的转换。

如果这不切实际,我建议保留 JSON 字段 "as is" 并仅解析它 on-demand 以提取特定值。您可以使用 get_json_object 和显式类型转换。这允许测试不同的场景:

coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
  .map(get_json_object($"json_col", _)): _*).cast(DoubleType)

不假设单个文档结构。

您可以使用二进制 Encoders (Encoders.kryo, Encoders.java) 或 RDD API 获得更大的灵活性,它们可用于存储联合类型(甚至 Any),但如果您真的希望 完全随机输出 ,则表明存在一些严重的设计或数据建模问题。即使您可以存储解析后的数据,也很难使用它。