如何使 Spark 流顺序执行
How to make Spark streams execute sequentially
问题
我有一个作业总共执行两个流,但我希望最后一个在第一个流完成后开始,因为第一个流将读取流中的事件保存在用作第二个流的输入的 DeltaTable 中。问题是第一个流中添加的内容在当前笔记本 运行 中的第二个流中不可用,因为它们同时启动。
有没有办法在从同一个笔记本 运行 执行命令时执行命令?
我尝试了 awaitTermination
功能,但发现这并不能解决我的问题。一些伪代码:
def main():
# Read eventhub
metricbeat_df = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Save raw events
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query1") \
.table("my_db.raw_events")
# Parse events
metricbeat_df = spark.readStream \
.format("delta") \
.option("ignoreDeletes", True) \
.table("my_db.raw_events")
# *Do some transformations here*
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query2") \
.table("my_db.joined_bronze_events")
TLDR
总结一下这个问题:当我运行上面的代码时,query1
和query2
同时开始,这导致my_db.joined_bronze_events
有点落后my_db.raw_events
因为query1中添加的内容在当前运行中的query2中不可用(当然会在下一个运行中)。
有没有办法强制 query2
在 query1
完成之前不会开始,同时仍然 运行 在同一个笔记本中使用它?
当您使用选项 Trigger.once
时,您可以在 StreamingQuery
:
中使用 processAllAvailable
方法
def main():
# Read eventhub
# note that I have changed the variable name to metricbeat_df1
metricbeat_df1 = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Save raw events
metricbeat_df1.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query1") \
.table("my_db.raw_events") \
.processAllAvailable()
# Parse events
# note that I have changed the variable name to metricbeat_df2
metricbeat_df2 = spark.readStream \
.format("delta") \
.option("ignoreDeletes", True) \
.table("my_db.raw_events")
# *Do some transformations here*
metricbeat_df2.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query2") \
.table("my_db.joined_bronze_events") \
.processAllAvailable()
注意,我已经更改了数据框名称,因为它们对于两个流式查询不应该相同。
方法processAllAvailable描述为:
"Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a org.apache.spark.sql.execution.streaming.Source prior to invocation. (i.e. getOffset must immediately reflect the addition)."
问题
我有一个作业总共执行两个流,但我希望最后一个在第一个流完成后开始,因为第一个流将读取流中的事件保存在用作第二个流的输入的 DeltaTable 中。问题是第一个流中添加的内容在当前笔记本 运行 中的第二个流中不可用,因为它们同时启动。
有没有办法在从同一个笔记本 运行 执行命令时执行命令?
我尝试了 awaitTermination
功能,但发现这并不能解决我的问题。一些伪代码:
def main():
# Read eventhub
metricbeat_df = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Save raw events
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query1") \
.table("my_db.raw_events")
# Parse events
metricbeat_df = spark.readStream \
.format("delta") \
.option("ignoreDeletes", True) \
.table("my_db.raw_events")
# *Do some transformations here*
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query2") \
.table("my_db.joined_bronze_events")
TLDR
总结一下这个问题:当我运行上面的代码时,query1
和query2
同时开始,这导致my_db.joined_bronze_events
有点落后my_db.raw_events
因为query1中添加的内容在当前运行中的query2中不可用(当然会在下一个运行中)。
有没有办法强制 query2
在 query1
完成之前不会开始,同时仍然 运行 在同一个笔记本中使用它?
当您使用选项 Trigger.once
时,您可以在 StreamingQuery
:
processAllAvailable
方法
def main():
# Read eventhub
# note that I have changed the variable name to metricbeat_df1
metricbeat_df1 = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Save raw events
metricbeat_df1.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query1") \
.table("my_db.raw_events") \
.processAllAvailable()
# Parse events
# note that I have changed the variable name to metricbeat_df2
metricbeat_df2 = spark.readStream \
.format("delta") \
.option("ignoreDeletes", True) \
.table("my_db.raw_events")
# *Do some transformations here*
metricbeat_df2.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query2") \
.table("my_db.joined_bronze_events") \
.processAllAvailable()
注意,我已经更改了数据框名称,因为它们对于两个流式查询不应该相同。
方法processAllAvailable描述为:
"Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a org.apache.spark.sql.execution.streaming.Source prior to invocation. (i.e. getOffset must immediately reflect the addition)."