我可以 "branch" 流入许多并在 pyspark 中并行写入它们吗?

Can I "branch" stream into many and write them in parallel in pyspark?

我正在 pyspark 中接收 Kafka 流。目前我按一组字段对其进行分组并将更新写入数据库:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
        .option("subscribe", topic)

...

df = df \
        .groupBy("myfield1") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")

query = df \
        .writeStream \
        .outputMode("update") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
        .start()

query.awaitTermination()

我可以在中间使用相同的链并创建另一个分组吗

df2 = df \
        .groupBy("myfield2") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")

并将它的输出并行写入不同的地方?

在哪里调用 writeStreamawaitTermination

是的,您可以将 Kafka 输入流分支为任意数量的流式查询。

您需要考虑以下几点:

  1. query.awaitTermination 是一种阻塞方法,这意味着您在此方法之后编写的任何代码都不会执行,直到此 query 终止。
  2. 每个“分支”流式查询将 运行 并行进行,并且在它们的每个 writeStream 调用中定义一个检查点位置很重要。

总的来说,您的代码需要具有以下结构:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
        .option("subscribe", topic) \
        .[...]

# note that I changed the variable name to "df1"
df1 = df \
    .groupBy("myfield1") \
    .[...]

df2 = df \
    .groupBy("myfield2") \
    .[...]


query1 = df1 \
        .writeStream \
        .outputMode("update") \
        .option("checkpointLocation", "/tmp/checkpointLoc1") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df1, epoch)) \
        .start()

query2 = df2 \
        .writeStream \
        .outputMode("update") \
        .option("checkpointLocation", "/tmp/checkpointLoc2") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df2, epoch)) \
        .start()

spark.streams.awaitAnyTermination

补充说明:在您显示的代码中,您正在覆盖 df,因此 df2 的推导可能无法获得预期的结果。