PySpark UDF 使用 ElementTree returns 酸洗错误

PySpark UDF using ElementTree returns Pickling Error

我有以下数据框:

import pyspark.sql.functions as F
import pyspark.sql.types as T
from lxml import etree

data=[
  (123,1,"string123","string456","string789")]
 
importSchema=(T.StructType([
  T.StructField("field1",T.IntegerType(),True),
  T.StructField("field2",T.IntegerType(),True),
  T.StructField("field3",T.StringType(), True),
  T.StructField("field4",T.StringType(),True),
  T.StructField("field5",T.StringType(),True)
]))  
  
df=spark.createDataFrame(data=data,schema=importSchema)

我正在尝试创建一个 udf,它从每个字段中获取值并使用 etree 构造一个 xml 字符串。

def create_str(field1,field2,field3,field4,field5):

    outer = etree.SubElement(root, 'outer')
    field1s = etree.SubElement(outer, field1)
    field2s = etree.SubElement(outer, field2)
    field3s = etree.SubElement(outer, field3)
    field4s = etree.SubElement(outer, field4)
    field5s = etree.SubElement(outer, field5)
    field1s.text = field1
    field2s.text = field2
    field3s.text = field3
    field4s.text = field4
    field5s.text = field5
    
    var=etree.tostring(root, pretty_print=True).decode('utf-8')
    
    return var
  
udf_create_str = F.udf(create_str)

df.withColumn("output", udf_create_str(df.field1,df.field2,df.field3,df.field4,df.field5)).show()

然而,这个returns:

PicklingError: Could not serialize object: TypeError: cannot pickle 'lxml.etree._Element' object

如何将 etree.tostring() 值获取到临时变量或列?

我认为主要问题是您有 int 个值需要转换为 str

这是我的尝试:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from xml.etree import ElementTree as etree


def create_str(fields):
    root = etree.Element("root")
    outer = etree.SubElement(root, "outer")
    field_names = ["field1", "field2", "field3", "field4", "field5"]
    for field, field_name in zip(fields, field_names):
        field_tmp = etree.SubElement(outer, field_name)
        field_tmp.text = str(field)
    return etree.tostring(root, encoding="utf-8").decode("utf-8")


if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    data = [(123, 1, "string123", "string456", "string789")]

    importSchema = T.StructType(
        [
            T.StructField("field1", T.IntegerType(), True),
            T.StructField("field2", T.IntegerType(), True),
            T.StructField("field3", T.StringType(), True),
            T.StructField("field4", T.StringType(), True),
            T.StructField("field5", T.StringType(), True),
        ]
    )

    df = spark.createDataFrame(data=data, schema=importSchema)
    udf_create_str = F.udf(create_str)
    df.withColumn(
        "output",
        udf_create_str(F.array(df.field1, df.field2, df.field3, df.field4, df.field5)),
    ).show(20, False)

结果:

+------+------+---------+---------+---------+------------------------------------------------------------------------------------------------------------------------------------------------+
|field1|field2|field3   |field4   |field5   |output                                                                                                                                          |
+------+------+---------+---------+---------+------------------------------------------------------------------------------------------------------------------------------------------------+
|123   |1     |string123|string456|string789|<root><outer><field1>123</field1><field2>1</field2><field3>string123</field3><field4>string456</field4><field5>string789</field5></outer></root>|
+------+------+---------+---------+---------+------------------------------------------------------------------------------------------------------------------------------------------------+