KafkaStreams:获得 Window 最终结果
KafkaStreams: Getting Window Final Results
是否可以通过抑制中间结果在 Kafka Streams 中获得 window final result。
我无法实现这个目标。我的代码有什么问题?
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
.toStream()
.print(Printed.toSysOut())
它导致了这个错误:
Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
代码/错误详细信息:https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
创建流时添加Consumed
:builder.stream<String,Double>(inputTopic, Consumed.
with(Serdes.String(), Serdes.Double())
KeySerde 有问题。由于 WindowedBy
操作导致 Windowed<String>
类型密钥,但 .suppress()
使用默认密钥类型。
因此,您需要在调用 count 方法时在 State 存储上定义 KeySerde,如下所示:
builder.stream<String,Double>inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("count").withCachingDisabled().withKeySerde(Serdes.String()))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream()
. print(Printed.toSysOut());
问题是 Streams 在 windowing 期间自动包装显式 serde,但不会自动包装默认 serde 的方式存在令人困惑的不对称性。恕我直言,这是一个应该纠正的疏忽,所以我提交了:https://issues.apache.org/jira/browse/KAFKA-7806
正如其他人指出的那样,解决方案是在上游显式设置密钥 serde,而不是依赖默认密钥 serde。您可以:
使用 Materialized
在 windowed 聚合上设置 serdes
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(尼舒推荐)
(请注意,不需要 来命名 count
操作,这会产生使其可查询的副作用)
或者在更上游设置 serdes,例如在输入上:
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(正如 wardziniak 推荐的那样)
选择权在你;我认为在这种情况下,这两种情况都没有太大区别。如果您进行的聚合与 count
不同,您可能会通过 Materialized
设置值 serde,所以前者可能是更统一的样式。
我还注意到您的 window 定义没有设置宽限期。 window 关闭时间定义为 window end + grace period
,默认值为 24 小时,因此在 24 小时的数据通过 运行 之前,您不会看到抑制发出的任何内容应用
为了您的测试工作,我建议尝试:
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
在生产中,您需要 select 一个宽限期来平衡您在流中预期的事件延迟量与您希望从抑制中看到的发射及时性量。
最后一点,我在你的要点中注意到你没有更改默认缓存或提交间隔。因此,您会注意到 count
运算符本身会将更新缓冲默认的 30 秒,然后再将它们传递给抑制。这是一个很好的生产配置,因此您不会对本地磁盘或 Kafka 代理造成瓶颈。但在您测试时它可能会让您大吃一惊。
通常用于测试(或交互式尝试),我将禁用缓存并设置较短的提交间隔以最大程度地提高开发人员的理智:
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
对于 serde 的疏忽,我们深表歉意。我希望我们能尽快解决 KAFKA-7806。
希望对您有所帮助!
是否可以通过抑制中间结果在 Kafka Streams 中获得 window final result。
我无法实现这个目标。我的代码有什么问题?
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
.toStream()
.print(Printed.toSysOut())
它导致了这个错误:
Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
代码/错误详细信息:https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
创建流时添加Consumed
:builder.stream<String,Double>(inputTopic, Consumed.
with(Serdes.String(), Serdes.Double())
KeySerde 有问题。由于 WindowedBy
操作导致 Windowed<String>
类型密钥,但 .suppress()
使用默认密钥类型。
因此,您需要在调用 count 方法时在 State 存储上定义 KeySerde,如下所示:
builder.stream<String,Double>inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("count").withCachingDisabled().withKeySerde(Serdes.String()))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream()
. print(Printed.toSysOut());
问题是 Streams 在 windowing 期间自动包装显式 serde,但不会自动包装默认 serde 的方式存在令人困惑的不对称性。恕我直言,这是一个应该纠正的疏忽,所以我提交了:https://issues.apache.org/jira/browse/KAFKA-7806
正如其他人指出的那样,解决方案是在上游显式设置密钥 serde,而不是依赖默认密钥 serde。您可以:
使用 Materialized
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(尼舒推荐)
(请注意,不需要 来命名 count
操作,这会产生使其可查询的副作用)
或者在更上游设置 serdes,例如在输入上:
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(正如 wardziniak 推荐的那样)
选择权在你;我认为在这种情况下,这两种情况都没有太大区别。如果您进行的聚合与 count
不同,您可能会通过 Materialized
设置值 serde,所以前者可能是更统一的样式。
我还注意到您的 window 定义没有设置宽限期。 window 关闭时间定义为 window end + grace period
,默认值为 24 小时,因此在 24 小时的数据通过 运行 之前,您不会看到抑制发出的任何内容应用
为了您的测试工作,我建议尝试:
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
在生产中,您需要 select 一个宽限期来平衡您在流中预期的事件延迟量与您希望从抑制中看到的发射及时性量。
最后一点,我在你的要点中注意到你没有更改默认缓存或提交间隔。因此,您会注意到 count
运算符本身会将更新缓冲默认的 30 秒,然后再将它们传递给抑制。这是一个很好的生产配置,因此您不会对本地磁盘或 Kafka 代理造成瓶颈。但在您测试时它可能会让您大吃一惊。
通常用于测试(或交互式尝试),我将禁用缓存并设置较短的提交间隔以最大程度地提高开发人员的理智:
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
对于 serde 的疏忽,我们深表歉意。我希望我们能尽快解决 KAFKA-7806。
希望对您有所帮助!