如何将处理后的spark流插入到kafka中

How to insert processed spark stream into kafka

我正在尝试在使用以下代码段处理后将 spark 流插入 kafka

query = ds1 \
    .selectExpr("CAST(value AS STRING)")\
    .writeStream\
    .foreachBatch(do_something) \
    .format("kafka") \
    .option("topic","topic-name") \
    .option("kafka.bootstrap.servers", "borkers-IPs") \
    .option("checkpointLocation", "/home/location") \
    .start()

但它似乎插入的是原始流而不是处理后的流。

如您所见,foreachBatch 的使用在这里没有任何效果。 Spark 不会产生错误,它就像进入了虚空。

Quote from the manuals:

Structured Streaming APIs provide two ways to write the output of a streaming query to data sources that do not have an existing streaming sink: foreachBatch() and foreach().

这本优秀的读物正是您要找的。

https://aseigneurin.github.io/2018/08/14/kafka-tutorial-8-spark-structured-streaming.html