如何将处理后的spark流插入到kafka中
How to insert processed spark stream into kafka
我正在尝试在使用以下代码段处理后将 spark 流插入 kafka
query = ds1 \
.selectExpr("CAST(value AS STRING)")\
.writeStream\
.foreachBatch(do_something) \
.format("kafka") \
.option("topic","topic-name") \
.option("kafka.bootstrap.servers", "borkers-IPs") \
.option("checkpointLocation", "/home/location") \
.start()
但它似乎插入的是原始流而不是处理后的流。
如您所见,foreachBatch 的使用在这里没有任何效果。 Spark 不会产生错误,它就像进入了虚空。
Quote from the manuals:
Structured Streaming APIs provide two ways to write the output of a
streaming query to data sources that do not have an existing streaming
sink: foreachBatch() and foreach().
这本优秀的读物正是您要找的。
https://aseigneurin.github.io/2018/08/14/kafka-tutorial-8-spark-structured-streaming.html
我正在尝试在使用以下代码段处理后将 spark 流插入 kafka
query = ds1 \
.selectExpr("CAST(value AS STRING)")\
.writeStream\
.foreachBatch(do_something) \
.format("kafka") \
.option("topic","topic-name") \
.option("kafka.bootstrap.servers", "borkers-IPs") \
.option("checkpointLocation", "/home/location") \
.start()
但它似乎插入的是原始流而不是处理后的流。
如您所见,foreachBatch 的使用在这里没有任何效果。 Spark 不会产生错误,它就像进入了虚空。
Quote from the manuals:
Structured Streaming APIs provide two ways to write the output of a streaming query to data sources that do not have an existing streaming sink: foreachBatch() and foreach().
这本优秀的读物正是您要找的。
https://aseigneurin.github.io/2018/08/14/kafka-tutorial-8-spark-structured-streaming.html