Kafka 流抑制不 return 任何值

Kafka streams suppress does not return any values

我在一个相对简单的窗口化字数统计示例上苦苦挣扎。 我试图只获得窗口结果,但根本没有收到任何东西。

KStream<String, Long> sl = s
    ...
    .groupBy((key, value) -> value)
    .windowedBy(of(ofSeconds(5))
        .advanceBy(ofSeconds(3))
        .grace(ofSeconds(2)))
    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
        "counts-store").withRetention(ofSeconds(7)))
    .suppress(untilWindowCloses(unbounded()))
    .toStream()
    .map((key, value) -> new KeyValue<>(key.key(), value))
    .to(outputTopicName, produced);

我正在输入一些内容:

inputWords.pipeInput(new TestRecord<>("word", "a b c", now));
inputWords.pipeInput(new TestRecord<>("word", "a c d c", now.plus(ofSeconds(6))));
inputWords.pipeInput(new TestRecord<>("word", "", now.plus(Duration.ofDays(1))));

但是什么也没有发出。 有人知道可能的解决方案吗?

如您所见,我已经在使用宽限期和保留期,正如其他人所写的那样,这可能有所帮助,但实际上并没有帮助。 在评论抑制线一切正常。

您必须为您的计数 Materialized 视图提供有效的 Serdes 以便 Kafka Stream 可以正确地为内部抑制处理器提供有效的 Window Serdes,否则该处理器将选择默认值可能导致序列化无法正常工作的关键 serdes,我在 KTableSuppressProcessor.buffer() 中得到以下异常:

//please check if you get this exception
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String

    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)

正确地为物化视图counts-store提供有效的Serde,您应该得到预期的输出:

Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store")
    .withRetention(ofSeconds(7))
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long())