Structured Streaming 如何执行单独的流式查询(并行或顺序)?
How does Structured Streaming execute separate streaming queries (in parallel or sequentially)?
我正在编写一个测试应用程序,它使用来自 Kafka 的 topcis 的消息,然后将数据推送到 S3 和 RDBMS tables(流程类似于此处所示:https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)。所以我从卡夫卡读取数据然后:
- 每条消息都想保存到 S3
- 一些消息保存到 table 外部数据库中的 A(基于过滤条件)
- 一些其他消息保存到外部数据库中的 table B(其他过滤条件)
所以我有这样的东西:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
(请注意,我正在阅读不止一个 Kafka 主题)。
接下来我定义所需的数据集:
Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
现在我为每个数据集创建查询以开始处理:
StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()
StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table
.start();
StreamingQuery secondQuery = messagesOfType2
.writeStream()
.foreach(new CustomForEachWiriterType2()) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before)
.start();
现在我在想:
这些查询是否会并行执行(或以 FIFO 顺序一个接一个地执行,我应该将这些查询分配给单独的调度程序池)?
Will be those queries executed in parallel
是的。这些查询将并行执行(您未指定的每个 trigger
,因此要尽快 运行 它们)。
在内部,当您在下面的 DataStreamWriter, you create a StreamExecution
that in turn creates immediately so-called daemon microBatchThread
(quoted from the Spark source code 上执行 start
时):
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
sparkSession.sparkContext.setCallSite(callSite)
runBatches()
}
}
您可以在其自己的线程中看到每个查询,名称为:
stream execution thread for [prettyIdString]
我正在编写一个测试应用程序,它使用来自 Kafka 的 topcis 的消息,然后将数据推送到 S3 和 RDBMS tables(流程类似于此处所示:https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)。所以我从卡夫卡读取数据然后:
- 每条消息都想保存到 S3
- 一些消息保存到 table 外部数据库中的 A(基于过滤条件)
- 一些其他消息保存到外部数据库中的 table B(其他过滤条件)
所以我有这样的东西:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
(请注意,我正在阅读不止一个 Kafka 主题)。 接下来我定义所需的数据集:
Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
现在我为每个数据集创建查询以开始处理:
StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()
StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table
.start();
StreamingQuery secondQuery = messagesOfType2
.writeStream()
.foreach(new CustomForEachWiriterType2()) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before)
.start();
现在我在想:
这些查询是否会并行执行(或以 FIFO 顺序一个接一个地执行,我应该将这些查询分配给单独的调度程序池)?
Will be those queries executed in parallel
是的。这些查询将并行执行(您未指定的每个 trigger
,因此要尽快 运行 它们)。
在内部,当您在下面的 DataStreamWriter, you create a StreamExecution
that in turn creates immediately so-called daemon microBatchThread
(quoted from the Spark source code 上执行 start
时):
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
sparkSession.sparkContext.setCallSite(callSite)
runBatches()
}
}
您可以在其自己的线程中看到每个查询,名称为:
stream execution thread for [prettyIdString]