我可以 "branch" 流入许多并在 pyspark 中并行写入它们吗?
Can I "branch" stream into many and write them in parallel in pyspark?
我正在 pyspark 中接收 Kafka 流。目前我按一组字段对其进行分组并将更新写入数据库:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", topic)
...
df = df \
.groupBy("myfield1") \
.agg(
expr("count(*) as cnt"),
min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
) \
.select("cnt", "minData.*") \
.select(
col("...").alias("..."),
...
col("userId").alias("user_id")
query = df \
.writeStream \
.outputMode("update") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
.start()
query.awaitTermination()
我可以在中间使用相同的链并创建另一个分组吗
df2 = df \
.groupBy("myfield2") \
.agg(
expr("count(*) as cnt"),
min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
) \
.select("cnt", "minData.*") \
.select(
col("...").alias("..."),
...
col("userId").alias("user_id")
并将它的输出并行写入不同的地方?
在哪里调用 writeStream
和 awaitTermination
?
是的,您可以将 Kafka 输入流分支为任意数量的流式查询。
您需要考虑以下几点:
query.awaitTermination
是一种阻塞方法,这意味着您在此方法之后编写的任何代码都不会执行,直到此 query
终止。
- 每个“分支”流式查询将 运行 并行进行,并且在它们的每个 writeStream 调用中定义一个检查点位置很重要。
总的来说,您的代码需要具有以下结构:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", topic) \
.[...]
# note that I changed the variable name to "df1"
df1 = df \
.groupBy("myfield1") \
.[...]
df2 = df \
.groupBy("myfield2") \
.[...]
query1 = df1 \
.writeStream \
.outputMode("update") \
.option("checkpointLocation", "/tmp/checkpointLoc1") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df1, epoch)) \
.start()
query2 = df2 \
.writeStream \
.outputMode("update") \
.option("checkpointLocation", "/tmp/checkpointLoc2") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df2, epoch)) \
.start()
spark.streams.awaitAnyTermination
补充说明:在您显示的代码中,您正在覆盖 df
,因此 df2
的推导可能无法获得预期的结果。
我正在 pyspark 中接收 Kafka 流。目前我按一组字段对其进行分组并将更新写入数据库:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", topic)
...
df = df \
.groupBy("myfield1") \
.agg(
expr("count(*) as cnt"),
min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
) \
.select("cnt", "minData.*") \
.select(
col("...").alias("..."),
...
col("userId").alias("user_id")
query = df \
.writeStream \
.outputMode("update") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
.start()
query.awaitTermination()
我可以在中间使用相同的链并创建另一个分组吗
df2 = df \
.groupBy("myfield2") \
.agg(
expr("count(*) as cnt"),
min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
) \
.select("cnt", "minData.*") \
.select(
col("...").alias("..."),
...
col("userId").alias("user_id")
并将它的输出并行写入不同的地方?
在哪里调用 writeStream
和 awaitTermination
?
是的,您可以将 Kafka 输入流分支为任意数量的流式查询。
您需要考虑以下几点:
query.awaitTermination
是一种阻塞方法,这意味着您在此方法之后编写的任何代码都不会执行,直到此query
终止。- 每个“分支”流式查询将 运行 并行进行,并且在它们的每个 writeStream 调用中定义一个检查点位置很重要。
总的来说,您的代码需要具有以下结构:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", topic) \
.[...]
# note that I changed the variable name to "df1"
df1 = df \
.groupBy("myfield1") \
.[...]
df2 = df \
.groupBy("myfield2") \
.[...]
query1 = df1 \
.writeStream \
.outputMode("update") \
.option("checkpointLocation", "/tmp/checkpointLoc1") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df1, epoch)) \
.start()
query2 = df2 \
.writeStream \
.outputMode("update") \
.option("checkpointLocation", "/tmp/checkpointLoc2") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df2, epoch)) \
.start()
spark.streams.awaitAnyTermination
补充说明:在您显示的代码中,您正在覆盖 df
,因此 df2
的推导可能无法获得预期的结果。