Pandas UDF 抛出长度不符合要求的错误
Pandas UDF throws error not of required length
我有一个 delta table,它有来自 kafka 的 thrift 数据,我正在使用 UDF 对其进行反序列化。我在使用常规 UDF 时没有问题,但在尝试使用 Pandas UDF 时出现错误。
这运行良好,即 ruglar UDF
def decoder(thrift_data):
schema_file = thriftpy2.load("/dbfs/FileStore/schema_file.thrift")
schema = schema_file.SchemaClass()
decoded_payload = deserialize(schema, thrift_data, TCyBinaryProtocolFactory())
json_data = proto.struct_to_json(decoded_payload)
return json.dumps(json_data)
decoder_udf = udf(decoder, StringType())
data = spark.sql("""SELECT value FROM data_table""")
data = data.withColumn('decoded_json', decoder_udf(data.value))
但是当我使用 Pandas UDF
def decoder(thrift_data: pd.Series) -> pd.Series:
schema_file = thriftpy2.load("/dbfs/FileStore/schema_file.thrift")
schema = schema_file.SchemaClass()
decoded_payload = deserialize(schema, thrift_data, TCyBinaryProtocolFactory())
json_data = proto.struct_to_json(decoded_payload)
return json.dumps(json_data)
decoder_udf = pandas_udf(decoder, returnType=StringType())
data = spark.sql("""SELECT value FROM data_table""")
data = data.withColumn('decoded_json', decoder_udf(data.value))
我收到错误 PythonException: 'RuntimeError: Result vector from pandas_udf was not the required length: expected 5000, got 651'.
找出解决方案,我们必须return输出为一个系列
我有一个 delta table,它有来自 kafka 的 thrift 数据,我正在使用 UDF 对其进行反序列化。我在使用常规 UDF 时没有问题,但在尝试使用 Pandas UDF 时出现错误。
这运行良好,即 ruglar UDF
def decoder(thrift_data):
schema_file = thriftpy2.load("/dbfs/FileStore/schema_file.thrift")
schema = schema_file.SchemaClass()
decoded_payload = deserialize(schema, thrift_data, TCyBinaryProtocolFactory())
json_data = proto.struct_to_json(decoded_payload)
return json.dumps(json_data)
decoder_udf = udf(decoder, StringType())
data = spark.sql("""SELECT value FROM data_table""")
data = data.withColumn('decoded_json', decoder_udf(data.value))
但是当我使用 Pandas UDF
def decoder(thrift_data: pd.Series) -> pd.Series:
schema_file = thriftpy2.load("/dbfs/FileStore/schema_file.thrift")
schema = schema_file.SchemaClass()
decoded_payload = deserialize(schema, thrift_data, TCyBinaryProtocolFactory())
json_data = proto.struct_to_json(decoded_payload)
return json.dumps(json_data)
decoder_udf = pandas_udf(decoder, returnType=StringType())
data = spark.sql("""SELECT value FROM data_table""")
data = data.withColumn('decoded_json', decoder_udf(data.value))
我收到错误 PythonException: 'RuntimeError: Result vector from pandas_udf was not the required length: expected 5000, got 651'.
找出解决方案,我们必须return输出为一个系列