如何每批次执行操作?

How to execute operations every batch?

我正在尝试从 kafka 主题中读取,进行一些操作,然后将 df 写入磁盘,例如:

df_alarmsFromKafka=spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", ip) \
.option("subscribe", topic) \
.option("request.timeout.ms",80000) \
.option("includeHeaders", "true") \
.load() 

df_alarmsFromKafka=df_alarmsFromKafka.drop("test")
print("ran only once, not in the stream")

batch_job=df_alarmsFromKafka.writeStream \
    .format("parquet").outputMode("append") \
    .option("path",path) \
    .option("checkpointLocation",cp) \
    .start()

batch_job.awaitTermination()

我遇到的问题是,只有对 df_alarmsFromKafka 的操作是每批 运行。
例如,如果我想在每批次中评估一个简单的打印件,这似乎是不可能的,因为它显然只在第一次打印和评估时才被打印出来。

是否有不同的方法使我能够在批次之间进行其他操作,而不仅仅是那些与 Dataframe.writeStream 评估严格相关的操作?

啊,我想我明白你的意思 “如果我想每批次评估一个简单的印刷品”。您似乎在要求 foreach operator:

Sets the output of the streaming query to be processed using the provided writer f. This is often used to write the output of a streaming query to arbitrary storage systems.

伪代码如下所示:

df_alarmsFromKafka = df_alarmsFromKafka.drop("test")
df_alarmsFromKafka.foreach(print("runs every batch, in the stream"))

batch_job=df_alarmsFromKafka.writeStream \
    .format("parquet").outputMode("append") \
    .option("path",path) \
    .option("checkpointLocation",cp) \
    .start()

以上实际上会启动 2 个流式查询(对于一个输入)。