Return 来自 UDF pyspark 的复杂嵌套数组类型
Return complex nested array type from UDF pyspark
最后更新了更多问题post
我需要在 pyspark 中使用 UDF 为 df 创建新列。 UDF 必须 return 嵌套数组,格式为:
[
[before], [after], [from_tbl], [where_tbl], [to_tbl], [lst_tbl], [db_info]
]
with:
-----------------
before, after = [
[query_type,out,[from],[where]],
[query_type,out,[from],[where]]
]
-----------------
to_tbl = [write_mode, [table_name], table_action]
-----------------
from_tbl, where_tbl, from, where, table_name, lst_tbl, db_info = [a,b,c]
我从 UDF 定义架构 return 例如:
schema_return = T.StructType([
T.StructField('before', T.ArrayType(T.StructType([
T.StructField('query_type', T.StringType(), True),
T.StructField('out', T.StringType(), True),
T.StructField('from', T.ArrayType(T.StringType(), True), True),
T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('after', T.ArrayType(T.StructType([
T.StructField('query_type', T.StringType(), True),
T.StructField('out', T.StringType(), True),
T.StructField('from', T.ArrayType(T.StringType(), True), True),
T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('from_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('where_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('to_tbl', T.StructType([
T.StructField('write_mode', T.StringType(), True),
T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
T.StructField('table_action', T.StringType(), True),
]), True),
T.StructField('lst_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('db_info', T.ArrayType(T.StringType(), True), True)
])
@F.udf(returnType=schema_return)
def udf(parameter):
...
我收到一个错误:
原因:java.lang.IllegalStateException:输入行没有架构所需的预期值数。需要 7 个字段,但提供了 0 个值。
我按照本教程进行操作:https://prodevsblog.com/questions/123979/how-to-return-a-tuple-type-in-a-udf-in-pyspark/
举例:
schema = StructType([
StructField("min", FloatType(), True),
StructField("size", IntegerType(), True),
StructField("edges", ArrayType(FloatType()), True),
StructField("val_to_index", MapType(FloatType(), IntegerType()), True)
# StructField('insanity', StructType([StructField("min_", FloatType(), True), StructField("size_", IntegerType(), True)]))
])
def func(values):
mn = min(values)
size = len(values)
lst = sorted(values)[::-1]
val_to_index = {x: i for i, x in enumerate(values)}
return (mn, size, lst, val_to_index)
func = udf(func, schema)
dff = df.select('*', func('y[]').alias('complex_type'))
dff.show(10, False)
# +---+----------+------------------------------------------------------+
# |x |y[] |complex_type |
# +---+----------+------------------------------------------------------+
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]|
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]|
# +---+----------+------------------------------------------------------+
我哪里错了?以及如何为上面的嵌套数组定义架构。
这是我的 UDF return
return [before, after, from_tbl, where_tbl, to_tbl, list(set(lst_tbl)), dbinfo]
or
return [] # maybe this is cause
更新更多
@mck 之后不要跟我说话 return[]。我将 return [] 替换为 return None。但是我收到了与第一个错误相同的更多错误,例如:
原因:java.lang.IllegalStateException:输入行没有架构所需的预期值数。需要 3 个字段,但提供了 0 个值
有架构
schema_return = T.StructType([
T.StructField('before', T.ArrayType(T.StructType([
T.StructField('query_type', T.StringType(), True),
T.StructField('out', T.StringType(), True),
T.StructField('from', T.ArrayType(T.StringType(), True), True),
T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('after', T.ArrayType(T.StructType([
T.StructField('query_type', T.StringType(), True),
T.StructField('out', T.StringType(), True),
T.StructField('from', T.ArrayType(T.StringType(), True), True),
T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('from_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('where_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('to_tbl', T.StructType([
T.StructField('write_mode', T.StringType(), True),
T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
T.StructField('table_action', T.StringType(), True),
]), True),
T.StructField('lst_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('db_info', T.ArrayType(T.StringType(), True), True)
])
基于错误值的数量 = 3。我猜原因来自
T.StructField('to_tbl', T.StructType([
T.StructField('write_mode', T.StringType(), True),
T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
T.StructField('table_action', T.StringType(), True),
]), True),
我的列表:[之前]、[之后]、[from_tbl]、[where_tbl]、[to_tbl]、[lst_tbl], [db_info] 如果条件不满足,则嵌套元素 = []。如果我将 [] 替换为 None。它影响到最后的逻辑代码。我怎样才能保留 [] 而不是 None。为什么这会导致错误
非常感谢
我做到了。如果 return 结构类型。需要returnNone。不 return [] 包括嵌套元素。非常感谢@mck
最后更新了更多问题post
我需要在 pyspark 中使用 UDF 为 df 创建新列。 UDF 必须 return 嵌套数组,格式为:
[
[before], [after], [from_tbl], [where_tbl], [to_tbl], [lst_tbl], [db_info]
]
with:
-----------------
before, after = [
[query_type,out,[from],[where]],
[query_type,out,[from],[where]]
]
-----------------
to_tbl = [write_mode, [table_name], table_action]
-----------------
from_tbl, where_tbl, from, where, table_name, lst_tbl, db_info = [a,b,c]
我从 UDF 定义架构 return 例如:
schema_return = T.StructType([
T.StructField('before', T.ArrayType(T.StructType([
T.StructField('query_type', T.StringType(), True),
T.StructField('out', T.StringType(), True),
T.StructField('from', T.ArrayType(T.StringType(), True), True),
T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('after', T.ArrayType(T.StructType([
T.StructField('query_type', T.StringType(), True),
T.StructField('out', T.StringType(), True),
T.StructField('from', T.ArrayType(T.StringType(), True), True),
T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('from_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('where_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('to_tbl', T.StructType([
T.StructField('write_mode', T.StringType(), True),
T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
T.StructField('table_action', T.StringType(), True),
]), True),
T.StructField('lst_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('db_info', T.ArrayType(T.StringType(), True), True)
])
@F.udf(returnType=schema_return)
def udf(parameter):
...
我收到一个错误:
原因:java.lang.IllegalStateException:输入行没有架构所需的预期值数。需要 7 个字段,但提供了 0 个值。
我按照本教程进行操作:https://prodevsblog.com/questions/123979/how-to-return-a-tuple-type-in-a-udf-in-pyspark/ 举例:
schema = StructType([
StructField("min", FloatType(), True),
StructField("size", IntegerType(), True),
StructField("edges", ArrayType(FloatType()), True),
StructField("val_to_index", MapType(FloatType(), IntegerType()), True)
# StructField('insanity', StructType([StructField("min_", FloatType(), True), StructField("size_", IntegerType(), True)]))
])
def func(values):
mn = min(values)
size = len(values)
lst = sorted(values)[::-1]
val_to_index = {x: i for i, x in enumerate(values)}
return (mn, size, lst, val_to_index)
func = udf(func, schema)
dff = df.select('*', func('y[]').alias('complex_type'))
dff.show(10, False)
# +---+----------+------------------------------------------------------+
# |x |y[] |complex_type |
# +---+----------+------------------------------------------------------+
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]|
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]|
# +---+----------+------------------------------------------------------+
我哪里错了?以及如何为上面的嵌套数组定义架构。
这是我的 UDF return
return [before, after, from_tbl, where_tbl, to_tbl, list(set(lst_tbl)), dbinfo]
or
return [] # maybe this is cause
更新更多
@mck 之后不要跟我说话 return[]。我将 return [] 替换为 return None。但是我收到了与第一个错误相同的更多错误,例如:
原因:java.lang.IllegalStateException:输入行没有架构所需的预期值数。需要 3 个字段,但提供了 0 个值
有架构
schema_return = T.StructType([
T.StructField('before', T.ArrayType(T.StructType([
T.StructField('query_type', T.StringType(), True),
T.StructField('out', T.StringType(), True),
T.StructField('from', T.ArrayType(T.StringType(), True), True),
T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('after', T.ArrayType(T.StructType([
T.StructField('query_type', T.StringType(), True),
T.StructField('out', T.StringType(), True),
T.StructField('from', T.ArrayType(T.StringType(), True), True),
T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('from_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('where_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('to_tbl', T.StructType([
T.StructField('write_mode', T.StringType(), True),
T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
T.StructField('table_action', T.StringType(), True),
]), True),
T.StructField('lst_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('db_info', T.ArrayType(T.StringType(), True), True)
])
基于错误值的数量 = 3。我猜原因来自
T.StructField('to_tbl', T.StructType([
T.StructField('write_mode', T.StringType(), True),
T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
T.StructField('table_action', T.StringType(), True),
]), True),
我的列表:[之前]、[之后]、[from_tbl]、[where_tbl]、[to_tbl]、[lst_tbl], [db_info] 如果条件不满足,则嵌套元素 = []。如果我将 [] 替换为 None。它影响到最后的逻辑代码。我怎样才能保留 [] 而不是 None。为什么这会导致错误 非常感谢
我做到了。如果 return 结构类型。需要returnNone。不 return [] 包括嵌套元素。非常感谢@mck