使用函数创建 Spark 输出流

Create Spark output streams with function

我使用 Databricks Auto Loader 提取包含具有不同架构的数据的文件,并希望使用更新模式将它们写入相应的增量 tables。

流中可能有许多 (>15) 种不同的消息类型,因此我必须为其中的一种编写输出流。每个 table.

都有一个“upsert”函数

可以使用一个函数(下面给出的示例)来压缩这将节省几次击键吗?

upload_path = '/example'

# Set up the stream to begin reading incoming files from the
# upload_path location.
df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'avro') \
  .load(upload_path)

# filter messages and apply JSON schema
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)

# each table has it's own upsert function
def create_output_stream(df, table_name, upsert_function):
    # Create stream and return it.
    return df.writeStream.format('delta') \
         .writeStream \
         .trigger(once=True) \
         .format("delta") \
         .foreachBatch(upsert_function) \
         .queryName(f"autoLoader_query_{table_name}") \
         .option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
         .outputMode("update")

output_stream1 = create_output_stream(table1_df, "table_name1", upsert_function1).start() # start stream in outer environment
output_stream2 = create_output_stream(table2_df, "table_name2", upsert_function2).start()
output_stream3 = create_output_stream(table3_df, "table_name3", upsert_function3).start()


是的,当然可以这样做 - 这是一个非常标准的模式。

但是您需要考虑一件事 - 如果您的输入数据未按消息类型分区,那么您将多次扫描相同的文件(针对每种消息类型)。它的替代方案可能如下 - 您使用单个 foreachBatch 执行所有消息类型的过滤和插入,如下所示:

df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'avro') \
  .load(upload_path)

def do_all_upserts(df, epoch):
  df.cache()
  table1_df = filter_and_transform(df, json_schema1)
  table2_df = filter_and_transform(df, json_schema2)
  table3_df = filter_and_transform(df, json_schema3)
  # really you can run multiple writes using multithreading, or something like it
  do_upsert(table1_df)
  do_upsert(table2_df)
  ...
  # free resources
  df.unpersist()

df.writeStream.format('delta') \
         .writeStream \
         .trigger(once=True) \
         .format("delta") \
         .foreachBatch(do_all_upserts) \
         .option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
         .start()