如何运行 多个结构化流并行?
How to run multiple structured streams in parallel?
我正在尝试写入并行流,但只有最后一个在工作。
这是我的代码。
val trump_topic = filteredQueryTrump.select($"ID".as("key"), $"title".as("value"))
.writeStream.format("kafka")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
.option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
.option("topic", "trump")
.option("checkpointLocation", "/tmp").start()
val biden_topic = filteredQueryBiden.select($"ID".as("key"), $"title".as("value"))
.writeStream.format("kafka")
.trigger(Trigger.ProcessingTime("4 seconds"))
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
.option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
.option("topic", "biden")
.option("checkpointLocation", "/tmp").start()
我尝试了spark.streams.awaitAnyTermination()
,但也没有用。
每个输出流需要两个不同的检查点位置。
此外,请务必添加 awaitAnyTermination
,因为您已经尝试过以确保作业持续 运行。
spark.streams.awaitAnyTermination()
作为替代方案,您可以通过执行以下操作将这两个查询合二为一
val combinedDf = filteredQueryTrump.select($"ID".as("key"), $"title".as("value"), lit("trump").as("topic"))
.union(filteredQueryBiden.select($"ID".as("key"), $"title".as("value"), lit("biden").as("topic")))
topic
列的内容将由 spark-sql-kafka-0-10 库解释,以决定该行应写入哪个主题。理论上,您可以使用该技术在一个流中写入多个 Kafka 主题。执行此操作时,重要的是 在调用 writeStream
时不要使用 主题选项。但是,这样一来,您将无法拥有两个不同的触发器持续时间,而只能为整个流设置一个。
您编写的示例可能如下所示:
val outputQuery = combinedDf.select($"key", $"value", $"topic")
.writeStream
.format("kafka")
.trigger(Trigger.ProcessingTime("4 seconds"))
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
.option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
.option("checkpointLocation", "/tmp")
outputQuery.start()
outputQuery.awaitTermination()
我正在尝试写入并行流,但只有最后一个在工作。
这是我的代码。
val trump_topic = filteredQueryTrump.select($"ID".as("key"), $"title".as("value"))
.writeStream.format("kafka")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
.option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
.option("topic", "trump")
.option("checkpointLocation", "/tmp").start()
val biden_topic = filteredQueryBiden.select($"ID".as("key"), $"title".as("value"))
.writeStream.format("kafka")
.trigger(Trigger.ProcessingTime("4 seconds"))
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
.option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
.option("topic", "biden")
.option("checkpointLocation", "/tmp").start()
我尝试了spark.streams.awaitAnyTermination()
,但也没有用。
每个输出流需要两个不同的检查点位置。
此外,请务必添加 awaitAnyTermination
,因为您已经尝试过以确保作业持续 运行。
spark.streams.awaitAnyTermination()
作为替代方案,您可以通过执行以下操作将这两个查询合二为一
val combinedDf = filteredQueryTrump.select($"ID".as("key"), $"title".as("value"), lit("trump").as("topic"))
.union(filteredQueryBiden.select($"ID".as("key"), $"title".as("value"), lit("biden").as("topic")))
topic
列的内容将由 spark-sql-kafka-0-10 库解释,以决定该行应写入哪个主题。理论上,您可以使用该技术在一个流中写入多个 Kafka 主题。执行此操作时,重要的是 在调用 writeStream
时不要使用 主题选项。但是,这样一来,您将无法拥有两个不同的触发器持续时间,而只能为整个流设置一个。
您编写的示例可能如下所示:
val outputQuery = combinedDf.select($"key", $"value", $"topic")
.writeStream
.format("kafka")
.trigger(Trigger.ProcessingTime("4 seconds"))
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";""")
.option("kafka.bootstrap.servers", "rocket-01.srvs.cloudkafka.com:9094")
.option("checkpointLocation", "/tmp")
outputQuery.start()
outputQuery.awaitTermination()