将多个查询用于 Spark 结构化流的用例

Use cases for using multiple queries for Spark Structured streaming

我需要从多个 Kafka 主题[基于 Avro] 进行流式传输,并将它们放入 Greenplum 中,并对负载进行少量修改。

Kaka 主题在配置文件中定义为一个列表,每个 Kafka 主题都有一个目标 table。

我正在寻找单个 Spark Structured 应用程序和配置文件中的更新以收听新主题或停止。听题目。

我正在寻求帮助,因为我对使用单个查询还是使用多个查询感到困惑:

val query1 = df.writeStream.start()
val query2 = df.writeStream.start()

spark.streams.awaitAnyTermination()

df.writeStream.start().awaitAnyTermination()

在哪些用例下应该使用多个查询而不是单个查询

显然,您可以使用正则表达式模式来使用来自不同 kafka 主题的数据。

比方说,您有诸如“topic-ingestion1”、“topic-ingestion2”之类的主题名称 - 然后您可以创建一个正则表达式模式来使用所有以“*ingestion”结尾的主题的数据。

一旦以正则表达式模式的格式创建了新主题 - spark 将自动开始从新创建的主题流式传输数据。

参考: [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#consumer-caching]

您可以使用此参数来指定缓存超时。 “spark.kafka.consumer.cache.timeout”。

来自 spark 文档:

spark.kafka.consumer.cache.timeout - The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.

比方说,如果你有多个接收器,你正在从 kafka 读取数据,并且你正在将它写入两个不同的位置,如 hdfs 和 hbase - 那么你可以将你的应用程序逻辑分支到两个 writeStreams。

如果接收器 (Greenplum) 支持批处理操作模式 - 那么您可以查看 spark 结构化流中的 forEachBatch() 函数。它将允许我们为两个操作重用相同的 batchDF。

参考: [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#consumer-caching]