Kafka 流抑制不 return 任何值
Kafka streams suppress does not return any values
我在一个相对简单的窗口化字数统计示例上苦苦挣扎。
我试图只获得窗口结果,但根本没有收到任何东西。
KStream<String, Long> sl = s
...
.groupBy((key, value) -> value)
.windowedBy(of(ofSeconds(5))
.advanceBy(ofSeconds(3))
.grace(ofSeconds(2)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"counts-store").withRetention(ofSeconds(7)))
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(outputTopicName, produced);
我正在输入一些内容:
inputWords.pipeInput(new TestRecord<>("word", "a b c", now));
inputWords.pipeInput(new TestRecord<>("word", "a c d c", now.plus(ofSeconds(6))));
inputWords.pipeInput(new TestRecord<>("word", "", now.plus(Duration.ofDays(1))));
但是什么也没有发出。
有人知道可能的解决方案吗?
如您所见,我已经在使用宽限期和保留期,正如其他人所写的那样,这可能有所帮助,但实际上并没有帮助。
在评论抑制线一切正常。
您必须为您的计数 Materialized
视图提供有效的 Serdes
以便 Kafka Stream 可以正确地为内部抑制处理器提供有效的 Window Serdes,否则该处理器将选择默认值可能导致序列化无法正常工作的关键 serdes,我在 KTableSuppressProcessor.buffer()
中得到以下异常:
//please check if you get this exception
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
正确地为物化视图counts-store
提供有效的Serde
,您应该得到预期的输出:
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store")
.withRetention(ofSeconds(7))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
我在一个相对简单的窗口化字数统计示例上苦苦挣扎。 我试图只获得窗口结果,但根本没有收到任何东西。
KStream<String, Long> sl = s
...
.groupBy((key, value) -> value)
.windowedBy(of(ofSeconds(5))
.advanceBy(ofSeconds(3))
.grace(ofSeconds(2)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"counts-store").withRetention(ofSeconds(7)))
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(outputTopicName, produced);
我正在输入一些内容:
inputWords.pipeInput(new TestRecord<>("word", "a b c", now));
inputWords.pipeInput(new TestRecord<>("word", "a c d c", now.plus(ofSeconds(6))));
inputWords.pipeInput(new TestRecord<>("word", "", now.plus(Duration.ofDays(1))));
但是什么也没有发出。 有人知道可能的解决方案吗?
如您所见,我已经在使用宽限期和保留期,正如其他人所写的那样,这可能有所帮助,但实际上并没有帮助。 在评论抑制线一切正常。
您必须为您的计数 Materialized
视图提供有效的 Serdes
以便 Kafka Stream 可以正确地为内部抑制处理器提供有效的 Window Serdes,否则该处理器将选择默认值可能导致序列化无法正常工作的关键 serdes,我在 KTableSuppressProcessor.buffer()
中得到以下异常:
//please check if you get this exception
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
正确地为物化视图counts-store
提供有效的Serde
,您应该得到预期的输出:
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store")
.withRetention(ofSeconds(7))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())