将多个查询用于 Spark 结构化流的用例
Use cases for using multiple queries for Spark Structured streaming
我需要从多个 Kafka 主题[基于 Avro] 进行流式传输,并将它们放入 Greenplum 中,并对负载进行少量修改。
Kaka 主题在配置文件中定义为一个列表,每个 Kafka 主题都有一个目标 table。
我正在寻找单个 Spark Structured 应用程序和配置文件中的更新以收听新主题或停止。听题目。
我正在寻求帮助,因为我对使用单个查询还是使用多个查询感到困惑:
val query1 = df.writeStream.start()
val query2 = df.writeStream.start()
spark.streams.awaitAnyTermination()
或
df.writeStream.start().awaitAnyTermination()
在哪些用例下应该使用多个查询而不是单个查询
显然,您可以使用正则表达式模式来使用来自不同 kafka 主题的数据。
比方说,您有诸如“topic-ingestion1”、“topic-ingestion2”之类的主题名称 - 然后您可以创建一个正则表达式模式来使用所有以“*ingestion”结尾的主题的数据。
一旦以正则表达式模式的格式创建了新主题 - spark 将自动开始从新创建的主题流式传输数据。
参考:
[https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#consumer-caching]
您可以使用此参数来指定缓存超时。
“spark.kafka.consumer.cache.timeout”。
来自 spark 文档:
spark.kafka.consumer.cache.timeout - The minimum amount of time a
consumer may sit idle in the pool before it is eligible for eviction
by the evictor.
比方说,如果你有多个接收器,你正在从 kafka 读取数据,并且你正在将它写入两个不同的位置,如 hdfs 和 hbase - 那么你可以将你的应用程序逻辑分支到两个 writeStreams。
如果接收器 (Greenplum) 支持批处理操作模式 - 那么您可以查看 spark 结构化流中的 forEachBatch() 函数。它将允许我们为两个操作重用相同的 batchDF。
参考:
[https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#consumer-caching]
我需要从多个 Kafka 主题[基于 Avro] 进行流式传输,并将它们放入 Greenplum 中,并对负载进行少量修改。
Kaka 主题在配置文件中定义为一个列表,每个 Kafka 主题都有一个目标 table。
我正在寻找单个 Spark Structured 应用程序和配置文件中的更新以收听新主题或停止。听题目。
我正在寻求帮助,因为我对使用单个查询还是使用多个查询感到困惑:
val query1 = df.writeStream.start()
val query2 = df.writeStream.start()
spark.streams.awaitAnyTermination()
或
df.writeStream.start().awaitAnyTermination()
在哪些用例下应该使用多个查询而不是单个查询
显然,您可以使用正则表达式模式来使用来自不同 kafka 主题的数据。
比方说,您有诸如“topic-ingestion1”、“topic-ingestion2”之类的主题名称 - 然后您可以创建一个正则表达式模式来使用所有以“*ingestion”结尾的主题的数据。
一旦以正则表达式模式的格式创建了新主题 - spark 将自动开始从新创建的主题流式传输数据。
参考: [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#consumer-caching]
您可以使用此参数来指定缓存超时。 “spark.kafka.consumer.cache.timeout”。
来自 spark 文档:
spark.kafka.consumer.cache.timeout - The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.
比方说,如果你有多个接收器,你正在从 kafka 读取数据,并且你正在将它写入两个不同的位置,如 hdfs 和 hbase - 那么你可以将你的应用程序逻辑分支到两个 writeStreams。
如果接收器 (Greenplum) 支持批处理操作模式 - 那么您可以查看 spark 结构化流中的 forEachBatch() 函数。它将允许我们为两个操作重用相同的 batchDF。
参考: [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#consumer-caching]