使用函数创建 Spark 输出流
Create Spark output streams with function
我使用 Databricks Auto Loader 提取包含具有不同架构的数据的文件,并希望使用更新模式将它们写入相应的增量 tables。
流中可能有许多 (>15) 种不同的消息类型,因此我必须为其中的一种编写输出流。每个 table.
都有一个“upsert”函数
可以使用一个函数(下面给出的示例)来压缩这将节省几次击键吗?
upload_path = '/example'
# Set up the stream to begin reading incoming files from the
# upload_path location.
df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', 'avro') \
.load(upload_path)
# filter messages and apply JSON schema
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)
# each table has it's own upsert function
def create_output_stream(df, table_name, upsert_function):
# Create stream and return it.
return df.writeStream.format('delta') \
.writeStream \
.trigger(once=True) \
.format("delta") \
.foreachBatch(upsert_function) \
.queryName(f"autoLoader_query_{table_name}") \
.option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
.outputMode("update")
output_stream1 = create_output_stream(table1_df, "table_name1", upsert_function1).start() # start stream in outer environment
output_stream2 = create_output_stream(table2_df, "table_name2", upsert_function2).start()
output_stream3 = create_output_stream(table3_df, "table_name3", upsert_function3).start()
是的,当然可以这样做 - 这是一个非常标准的模式。
但是您需要考虑一件事 - 如果您的输入数据未按消息类型分区,那么您将多次扫描相同的文件(针对每种消息类型)。它的替代方案可能如下 - 您使用单个 foreachBatch 执行所有消息类型的过滤和插入,如下所示:
df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', 'avro') \
.load(upload_path)
def do_all_upserts(df, epoch):
df.cache()
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)
# really you can run multiple writes using multithreading, or something like it
do_upsert(table1_df)
do_upsert(table2_df)
...
# free resources
df.unpersist()
df.writeStream.format('delta') \
.writeStream \
.trigger(once=True) \
.format("delta") \
.foreachBatch(do_all_upserts) \
.option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
.start()
我使用 Databricks Auto Loader 提取包含具有不同架构的数据的文件,并希望使用更新模式将它们写入相应的增量 tables。
流中可能有许多 (>15) 种不同的消息类型,因此我必须为其中的一种编写输出流。每个 table.
都有一个“upsert”函数可以使用一个函数(下面给出的示例)来压缩这将节省几次击键吗?
upload_path = '/example'
# Set up the stream to begin reading incoming files from the
# upload_path location.
df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', 'avro') \
.load(upload_path)
# filter messages and apply JSON schema
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)
# each table has it's own upsert function
def create_output_stream(df, table_name, upsert_function):
# Create stream and return it.
return df.writeStream.format('delta') \
.writeStream \
.trigger(once=True) \
.format("delta") \
.foreachBatch(upsert_function) \
.queryName(f"autoLoader_query_{table_name}") \
.option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
.outputMode("update")
output_stream1 = create_output_stream(table1_df, "table_name1", upsert_function1).start() # start stream in outer environment
output_stream2 = create_output_stream(table2_df, "table_name2", upsert_function2).start()
output_stream3 = create_output_stream(table3_df, "table_name3", upsert_function3).start()
是的,当然可以这样做 - 这是一个非常标准的模式。
但是您需要考虑一件事 - 如果您的输入数据未按消息类型分区,那么您将多次扫描相同的文件(针对每种消息类型)。它的替代方案可能如下 - 您使用单个 foreachBatch 执行所有消息类型的过滤和插入,如下所示:
df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', 'avro') \
.load(upload_path)
def do_all_upserts(df, epoch):
df.cache()
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)
# really you can run multiple writes using multithreading, or something like it
do_upsert(table1_df)
do_upsert(table2_df)
...
# free resources
df.unpersist()
df.writeStream.format('delta') \
.writeStream \
.trigger(once=True) \
.format("delta") \
.foreachBatch(do_all_upserts) \
.option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
.start()