如何使用 UDF 处理大增量 table?

How to process a large delta table with UDF?

我有一个包含大约 3000 亿行的增量 table。现在我正在使用 UDF 对一个列执行一些操作并创建另一个列

我的代码是这样的

def my_udf(data):
    return pass
       
 
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))

现在的问题是这需要很长时间,因为 Spark 将处理所有 3000 亿行,然后写入输出。有没有办法我们可以做一些 Mirco 批处理并定期将这些输出写入输出增量 table

通常的第一条规则是尽可能避免使用 UDF - 您需要执行哪种转换在 Spark 本身中不可用?

第二条规则 - 如果你不能避免使用 UDF,至少使用 Pandas UDFs 批量处理数据,并且没有那么大的 serialization/deserialization 开销 - 通常的 UDF 正在处理数据逐行编码和解码数据。

如果您的 table 是在一段时间内构建的,并且包含许多文件,您可以尝试将 Spark Structured Streaming 与 Trigger.AvailableNow 一起使用(需要 DBR 10.3 或 10.4),如下所示:

maxNumFiles = 10 # max number of parquet files processed at once
df = spark.readStream \
  .option("maxFilesPerTrigger", maxNumFiles) \ 
  .table("large_table")
df = df.withColumn('new_column', udf_func(data.value))
df.writeStream \
  .option("checkpointLocation", "/some/path") \
  .trigger(availableNow=True) \
  .toTable("my_destination_table")

这将逐块读取源 table,应用您的转换,并将数据写入目标 table。