根据 Dataframe 中的条件向 Kafka 主题发送数据
Send data to Kafka topics based on a condition in Dataframe
我想根据 SparkStreaming 中的数据值更改 Kafka 主题目标以保存数据。
是否有可能再次这样做?
当我尝试下面的代码时,它只执行了第一个,而没有执行下面的过程。
(testdf
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
)
(testdf
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
)
数据存储在主题名称test中。
谁能想办法做到这一点?
我改变了保存这样一个数据框的目的地。
|type|value|
| A |testvalue|
| B |testvalue|
键入 A 进行主题测试。
输入 B 到主题 testB.
使用最新版本的 Spark,您只需在数据框中创建一个列 topic
,用于将记录定向到相应的主题。
在你的情况下,这意味着你可以做类似
的事情
testdf
.withColumn("topic", when(f.col("value") == "A", lit("test")).otherwise(lit("testB"))
.selectExpr("CAST(value as STRING) as value", "topic")
.writeStream .format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.start()
谢谢迈克。
我能够通过 运行 以下代码实现此目的!
(
testdf
.withColumn("topic",f.when(f.col("testTime") == "A", f.lit("test")).otherwise(("testB")))
.selectExpr("CAST(value as STRING) as value", "topic")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("startingOffsets", "latest")
.option("kafka.bootstrap.servers","9092")
.start()
)
我想根据 SparkStreaming 中的数据值更改 Kafka 主题目标以保存数据。 是否有可能再次这样做? 当我尝试下面的代码时,它只执行了第一个,而没有执行下面的过程。
(testdf
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
)
(testdf
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
)
数据存储在主题名称test中。 谁能想办法做到这一点?
我改变了保存这样一个数据框的目的地。
|type|value|
| A |testvalue|
| B |testvalue|
键入 A 进行主题测试。 输入 B 到主题 testB.
使用最新版本的 Spark,您只需在数据框中创建一个列 topic
,用于将记录定向到相应的主题。
在你的情况下,这意味着你可以做类似
的事情testdf
.withColumn("topic", when(f.col("value") == "A", lit("test")).otherwise(lit("testB"))
.selectExpr("CAST(value as STRING) as value", "topic")
.writeStream .format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.start()
谢谢迈克。 我能够通过 运行 以下代码实现此目的!
(
testdf
.withColumn("topic",f.when(f.col("testTime") == "A", f.lit("test")).otherwise(("testB")))
.selectExpr("CAST(value as STRING) as value", "topic")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("startingOffsets", "latest")
.option("kafka.bootstrap.servers","9092")
.start()
)