如何只在至少有N行时才执行流处理?
How to execute stream processing only when there are at least N rows?
我在 Kafka 消费者处有以下 spark SQL/Streaming 查询,我如何指定当批量大小达到特定大小 N 时获取应该是有条件的,否则消费者应该在处理之前缓冲元素,所以每当我想执行我的逻辑时,它保证我有一个大小为 N 的精确 Dataset<VideoEventData>
。当前代码:
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
.option("subscribe", prop.getProperty("kafka.topic"))
.option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
.option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
.load()
.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(VideoEventData.class));
您可以通过配置 Kafka 消费者本身来做到这一点。将 fetch.min.bytes
设置为您想要的最小值。这将告诉 Kafka 等到它有足够的数据。
有一个相关的设置,fetch.max.wait.ms
,它控制了 Kafka 最多等待多长时间。该值默认为 500 毫秒。您可以阅读更多 here.
i want to execute my logic it is gurateed that i have an exact Dataset of size N
开箱即用的 Spark Structured Streaming(和一般的 Spark)不可能做到这一点。
您有以下选择:
使用 Kafka 消费者属性配置位于 kafka 源后面的 Kafka 消费者。
自己缓冲行作为任意有状态聚合的一部分。
编写自定义源来处理缓冲本身。
对于 2。我可以使用 KeyValueGroupedDataset.flatMapGroupsWithState 和一个状态,该状态会累积超过 "chunks",最终会给你大小 N。
对于 3. 实施自定义 有状态 流式传输 Source 以 [=10= 的方式实施 getOffset
和 getBatch
] 只会在至少有 N
行时给出偏移量。
免责声明:我以前从未自己做过任何一种解决方案,但它们看起来可行。
我在 Kafka 消费者处有以下 spark SQL/Streaming 查询,我如何指定当批量大小达到特定大小 N 时获取应该是有条件的,否则消费者应该在处理之前缓冲元素,所以每当我想执行我的逻辑时,它保证我有一个大小为 N 的精确 Dataset<VideoEventData>
。当前代码:
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
.option("subscribe", prop.getProperty("kafka.topic"))
.option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
.option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
.load()
.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(VideoEventData.class));
您可以通过配置 Kafka 消费者本身来做到这一点。将 fetch.min.bytes
设置为您想要的最小值。这将告诉 Kafka 等到它有足够的数据。
有一个相关的设置,fetch.max.wait.ms
,它控制了 Kafka 最多等待多长时间。该值默认为 500 毫秒。您可以阅读更多 here.
i want to execute my logic it is gurateed that i have an exact Dataset of size N
开箱即用的 Spark Structured Streaming(和一般的 Spark)不可能做到这一点。
您有以下选择:
使用 Kafka 消费者属性配置位于 kafka 源后面的 Kafka 消费者。
自己缓冲行作为任意有状态聚合的一部分。
编写自定义源来处理缓冲本身。
对于 2。我可以使用 KeyValueGroupedDataset.flatMapGroupsWithState 和一个状态,该状态会累积超过 "chunks",最终会给你大小 N。
对于 3. 实施自定义 有状态 流式传输 Source 以 [=10= 的方式实施 getOffset
和 getBatch
] 只会在至少有 N
行时给出偏移量。
免责声明:我以前从未自己做过任何一种解决方案,但它们看起来可行。