如何使用 UDF 处理大增量 table?
How to process a large delta table with UDF?
我有一个包含大约 3000 亿行的增量 table。现在我正在使用 UDF 对一个列执行一些操作并创建另一个列
我的代码是这样的
def my_udf(data):
return pass
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))
现在的问题是这需要很长时间,因为 Spark 将处理所有 3000 亿行,然后写入输出。有没有办法我们可以做一些 Mirco 批处理并定期将这些输出写入输出增量 table
通常的第一条规则是尽可能避免使用 UDF - 您需要执行哪种转换在 Spark 本身中不可用?
第二条规则 - 如果你不能避免使用 UDF,至少使用 Pandas UDFs 批量处理数据,并且没有那么大的 serialization/deserialization 开销 - 通常的 UDF 正在处理数据逐行编码和解码数据。
如果您的 table 是在一段时间内构建的,并且包含许多文件,您可以尝试将 Spark Structured Streaming 与 Trigger.AvailableNow
一起使用(需要 DBR 10.3 或 10.4),如下所示:
maxNumFiles = 10 # max number of parquet files processed at once
df = spark.readStream \
.option("maxFilesPerTrigger", maxNumFiles) \
.table("large_table")
df = df.withColumn('new_column', udf_func(data.value))
df.writeStream \
.option("checkpointLocation", "/some/path") \
.trigger(availableNow=True) \
.toTable("my_destination_table")
这将逐块读取源 table,应用您的转换,并将数据写入目标 table。
我有一个包含大约 3000 亿行的增量 table。现在我正在使用 UDF 对一个列执行一些操作并创建另一个列
我的代码是这样的
def my_udf(data):
return pass
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))
现在的问题是这需要很长时间,因为 Spark 将处理所有 3000 亿行,然后写入输出。有没有办法我们可以做一些 Mirco 批处理并定期将这些输出写入输出增量 table
通常的第一条规则是尽可能避免使用 UDF - 您需要执行哪种转换在 Spark 本身中不可用?
第二条规则 - 如果你不能避免使用 UDF,至少使用 Pandas UDFs 批量处理数据,并且没有那么大的 serialization/deserialization 开销 - 通常的 UDF 正在处理数据逐行编码和解码数据。
如果您的 table 是在一段时间内构建的,并且包含许多文件,您可以尝试将 Spark Structured Streaming 与 Trigger.AvailableNow
一起使用(需要 DBR 10.3 或 10.4),如下所示:
maxNumFiles = 10 # max number of parquet files processed at once
df = spark.readStream \
.option("maxFilesPerTrigger", maxNumFiles) \
.table("large_table")
df = df.withColumn('new_column', udf_func(data.value))
df.writeStream \
.option("checkpointLocation", "/some/path") \
.trigger(availableNow=True) \
.toTable("my_destination_table")
这将逐块读取源 table,应用您的转换,并将数据写入目标 table。