returns 动态数据类型的 Apache Spark UDF
Apache Spark UDF that returns dynamic data types
我有 UDF
每行处理 JSON 和 returns 动态数据结果。在我的例子中,我需要它来验证数据和 return 验证数据。
架构对于每一行都是灵活的。这意味着我无法为每个案例创建 case class
(我的一些数据可以嵌套)。
我尝试从我的 UDF 函数中 return 元组,但我也没有成功(因为我需要从列表转换为元组),而且我没有找到一个优雅的解决方案为此。
我正在 return 的数据类型是 String
、Integer
、Double
、DateTime
,顺序不同。
我尝试在 DataFrame 上使用 map
,但我的架构有问题。
import spark.implicits._
def processData(row_type: String) = {
/*
completely random output here. Tuple/List/Array of
elements with a type Integer, String, Double, DateType.
*/
// pseudo-code starts here
if row_type == A
(1, "second", 3)
else
(1, "second", 3, 4)
}
val processDataUDF = udf((row_type: String) => processData(row_type))
val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()
结果
+------------+
| UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+
我该如何解决这个问题?每个 row_type
我们有不同的处理结果。所有 row_type
都是动态设置的。我可以为每个 row_type
很好地 Schema
,但我不能用不同的模式生成相同的 UDF return 结果。
这里使用 map
是唯一的方法吗?
Spark Dataset
是一个列式数据结构,这里真的没有地方放灵活的模式。架构必须是同类的(所有行必须具有相同的一般结构)并且预先已知(如果您使用 UDF,它必须 return 明确定义 SQL 类型)。
您可以通过以下方式实现一些灵活性:
- 正在定义表示所有可能字段的超集的架构,并将各个列标记为
nullable
。这只有在没有类型冲突时才有可能(如果 Row
包含字段 foo
,它总是使用相同的 SQL 类型表示)。
- 使用集合类型(
MapType
、ArrayType
)来表示可变大小的字段。所有值和/或键必须是同一类型。
- 将原始数据重塑到可以用固定模式实际表示的程度。 Spark 包含
json4s
, which provides a set of tools for merging, diffing and querying JSON 数据作为其依赖项。如果需要,它可用于应用相对复杂的转换。
如果这不切实际,我建议保留 JSON 字段 "as is" 并仅解析它 on-demand 以提取特定值。您可以使用 get_json_object
和显式类型转换。这允许测试不同的场景:
coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
.map(get_json_object($"json_col", _)): _*).cast(DoubleType)
不假设单个文档结构。
您可以使用二进制 Encoders
(Encoders.kryo
, Encoders.java
) 或 RDD
API 获得更大的灵活性,它们可用于存储联合类型(甚至 Any
),但如果您真的希望 完全随机输出 ,则表明存在一些严重的设计或数据建模问题。即使您可以存储解析后的数据,也很难使用它。
我有 UDF
每行处理 JSON 和 returns 动态数据结果。在我的例子中,我需要它来验证数据和 return 验证数据。
架构对于每一行都是灵活的。这意味着我无法为每个案例创建 case class
(我的一些数据可以嵌套)。
我尝试从我的 UDF 函数中 return 元组,但我也没有成功(因为我需要从列表转换为元组),而且我没有找到一个优雅的解决方案为此。
我正在 return 的数据类型是 String
、Integer
、Double
、DateTime
,顺序不同。
我尝试在 DataFrame 上使用 map
,但我的架构有问题。
import spark.implicits._
def processData(row_type: String) = {
/*
completely random output here. Tuple/List/Array of
elements with a type Integer, String, Double, DateType.
*/
// pseudo-code starts here
if row_type == A
(1, "second", 3)
else
(1, "second", 3, 4)
}
val processDataUDF = udf((row_type: String) => processData(row_type))
val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()
结果
+------------+
| UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+
我该如何解决这个问题?每个 row_type
我们有不同的处理结果。所有 row_type
都是动态设置的。我可以为每个 row_type
很好地 Schema
,但我不能用不同的模式生成相同的 UDF return 结果。
这里使用 map
是唯一的方法吗?
Spark Dataset
是一个列式数据结构,这里真的没有地方放灵活的模式。架构必须是同类的(所有行必须具有相同的一般结构)并且预先已知(如果您使用 UDF,它必须 return 明确定义 SQL 类型)。
您可以通过以下方式实现一些灵活性:
- 正在定义表示所有可能字段的超集的架构,并将各个列标记为
nullable
。这只有在没有类型冲突时才有可能(如果Row
包含字段foo
,它总是使用相同的 SQL 类型表示)。 - 使用集合类型(
MapType
、ArrayType
)来表示可变大小的字段。所有值和/或键必须是同一类型。 - 将原始数据重塑到可以用固定模式实际表示的程度。 Spark 包含
json4s
, which provides a set of tools for merging, diffing and querying JSON 数据作为其依赖项。如果需要,它可用于应用相对复杂的转换。
如果这不切实际,我建议保留 JSON 字段 "as is" 并仅解析它 on-demand 以提取特定值。您可以使用 get_json_object
和显式类型转换。这允许测试不同的场景:
coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
.map(get_json_object($"json_col", _)): _*).cast(DoubleType)
不假设单个文档结构。
您可以使用二进制 Encoders
(Encoders.kryo
, Encoders.java
) 或 RDD
API 获得更大的灵活性,它们可用于存储联合类型(甚至 Any
),但如果您真的希望 完全随机输出 ,则表明存在一些严重的设计或数据建模问题。即使您可以存储解析后的数据,也很难使用它。