如何运行 多个结构化流并行?

How to run multiple structured streams in parallel?

我正在尝试写入并行流,但只有最后一个在工作。

这是我的代码。

val trump_topic = filteredQueryTrump.select($"ID".as("key"), $"title".as("value"))
   .writeStream.format("kafka")
   .trigger(Trigger.ProcessingTime("10 seconds"))
   .option("kafka.security.protocol", "SASL_SSL")   
   .option("kafka.sasl.mechanism", "SCRAM-SHA-256")   
   .option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
   .option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
   .option("topic", "trump")
   .option("checkpointLocation", "/tmp").start()

val biden_topic = filteredQueryBiden.select($"ID".as("key"), $"title".as("value"))
   .writeStream.format("kafka") 
   .trigger(Trigger.ProcessingTime("4 seconds"))
   .option("kafka.security.protocol", "SASL_SSL")   
   .option("kafka.sasl.mechanism", "SCRAM-SHA-256")   
   .option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
   .option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
   .option("topic", "biden")
   .option("checkpointLocation", "/tmp").start()

我尝试了spark.streams.awaitAnyTermination(),但也没有用。

每个输出流需要两个不同的检查点位置

此外,请务必添加 awaitAnyTermination,因为您已经尝试过以确保作业持续 运行。

spark.streams.awaitAnyTermination()

作为替代方案,您可以通过执行以下操作将这两个查询合二为一

val combinedDf = filteredQueryTrump.select($"ID".as("key"), $"title".as("value"), lit("trump").as("topic"))
  .union(filteredQueryBiden.select($"ID".as("key"), $"title".as("value"), lit("biden").as("topic")))

topic 列的内容将由 spark-sql-kafka-0-10 库解释,以决定该行应写入哪个主题。理论上,您可以使用该技术在一个流中写入多个 Kafka 主题。执行此操作时,重要的是 在调用 writeStream 时不要使用 主题选项。但是,这样一来,您将无法拥有两个不同的触发器持续时间,而只能为整个流设置一个。

您编写的示例可能如下所示:

val outputQuery = combinedDf.select($"key", $"value", $"topic")
  .writeStream
  .format("kafka") 
  .trigger(Trigger.ProcessingTime("4 seconds"))
  .option("kafka.security.protocol", "SASL_SSL")   
  .option("kafka.sasl.mechanism", "SCRAM-SHA-256")   
  .option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
  .option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
  .option("checkpointLocation", "/tmp")

outputQuery.start()
outputQuery.awaitTermination()