如何每批次执行操作?
How to execute operations every batch?
我正在尝试从 kafka 主题中读取,进行一些操作,然后将 df 写入磁盘,例如:
df_alarmsFromKafka=spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", ip) \
.option("subscribe", topic) \
.option("request.timeout.ms",80000) \
.option("includeHeaders", "true") \
.load()
df_alarmsFromKafka=df_alarmsFromKafka.drop("test")
print("ran only once, not in the stream")
batch_job=df_alarmsFromKafka.writeStream \
.format("parquet").outputMode("append") \
.option("path",path) \
.option("checkpointLocation",cp) \
.start()
batch_job.awaitTermination()
我遇到的问题是,只有对 df_alarmsFromKafka 的操作是每批 运行。
例如,如果我想在每批次中评估一个简单的打印件,这似乎是不可能的,因为它显然只在第一次打印和评估时才被打印出来。
是否有不同的方法使我能够在批次之间进行其他操作,而不仅仅是那些与 Dataframe.writeStream
评估严格相关的操作?
啊,我想我明白你的意思 “如果我想每批次评估一个简单的印刷品”。您似乎在要求 foreach operator:
Sets the output of the streaming query to be processed using the provided writer f. This is often used to write the output of a streaming query to arbitrary storage systems.
伪代码如下所示:
df_alarmsFromKafka = df_alarmsFromKafka.drop("test")
df_alarmsFromKafka.foreach(print("runs every batch, in the stream"))
batch_job=df_alarmsFromKafka.writeStream \
.format("parquet").outputMode("append") \
.option("path",path) \
.option("checkpointLocation",cp) \
.start()
以上实际上会启动 2 个流式查询(对于一个输入)。
我正在尝试从 kafka 主题中读取,进行一些操作,然后将 df 写入磁盘,例如:
df_alarmsFromKafka=spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", ip) \
.option("subscribe", topic) \
.option("request.timeout.ms",80000) \
.option("includeHeaders", "true") \
.load()
df_alarmsFromKafka=df_alarmsFromKafka.drop("test")
print("ran only once, not in the stream")
batch_job=df_alarmsFromKafka.writeStream \
.format("parquet").outputMode("append") \
.option("path",path) \
.option("checkpointLocation",cp) \
.start()
batch_job.awaitTermination()
我遇到的问题是,只有对 df_alarmsFromKafka 的操作是每批 运行。
例如,如果我想在每批次中评估一个简单的打印件,这似乎是不可能的,因为它显然只在第一次打印和评估时才被打印出来。
是否有不同的方法使我能够在批次之间进行其他操作,而不仅仅是那些与 Dataframe.writeStream
评估严格相关的操作?
啊,我想我明白你的意思 “如果我想每批次评估一个简单的印刷品”。您似乎在要求 foreach operator:
Sets the output of the streaming query to be processed using the provided writer f. This is often used to write the output of a streaming query to arbitrary storage systems.
伪代码如下所示:
df_alarmsFromKafka = df_alarmsFromKafka.drop("test")
df_alarmsFromKafka.foreach(print("runs every batch, in the stream"))
batch_job=df_alarmsFromKafka.writeStream \
.format("parquet").outputMode("append") \
.option("path",path) \
.option("checkpointLocation",cp) \
.start()
以上实际上会启动 2 个流式查询(对于一个输入)。