根据 Dataframe 中的条件向 Kafka 主题发送数据

Send data to Kafka topics based on a condition in Dataframe

我想根据 SparkStreaming 中的数据值更改 Kafka 主题目标以保存数据。 是否有可能再次这样做? 当我尝试下面的代码时,它只执行了第一个,而没有执行下面的过程。

(testdf 
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
      )
            
(testdf 
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
      )

数据存储在主题名称test中。 谁能想办法做到这一点?

我改变了保存这样一个数据框的目的地。

|type|value|
| A  |testvalue|
| B  |testvalue|

键入 A 进行主题测试。 输入 B 到主题 testB.

使用最新版本的 Spark,您只需在数据框中创建一个列 topic,用于将记录定向到相应的主题。

在你的情况下,这意味着你可以做类似

的事情
testdf 
  .withColumn("topic", when(f.col("value") == "A", lit("test")).otherwise(lit("testB"))
  .selectExpr("CAST(value as STRING) as value", "topic") 
  .writeStream .format("kafka") 
  .option("checkpointLocation", "/checkpoint_1") 
  .option("kafka.bootstrap.servers","~~:9092")
  .start()

谢谢迈克。 我能够通过 运行 以下代码实现此目的!

(
testdf 
  .withColumn("topic",f.when(f.col("testTime") == "A", f.lit("test")).otherwise(("testB")))
  .selectExpr("CAST(value as STRING) as value", "topic") 
  .writeStream
  .format("kafka") 
  .option("checkpointLocation", "/checkpoint_2") 
  .option("startingOffsets", "latest")
  .option("kafka.bootstrap.servers","9092")
  .start()
)