为什么我看不到 Kafka Streams reduce 方法的任何输出?

Why don't I see any output from the Kafka Streams reduce method?

给定以下代码:

KStream<String, Custom> stream =  
    builder.stream(Serdes.String(), customSerde, "test_in");

stream
    .groupByKey(Serdes.String(), customSerde)
    .reduce(new CustomReducer(), "reduction_state")
    .print(Serdes.String(), customSerde);

我在 Reducer 的 apply 方法中有一个 println 语句,当我期望减少发生时它会成功打印出来。但是,上面显示的最终打印语句什么也不显示。同样,如果我使用 to 方法而不是 print,我在目标主题中看不到任何消息。

在reduce 语句之后我需要什么才能看到减少的结果?如果一个值被推送到输入,我不希望看到任何东西。如果推送具有相同键的第二个值,我希望 reducer 应用(它确实如此)并且我还希望减少的结果继续到处理管道中的下一步。如上所述,我在管道的后续步骤中没有看到任何东西,我不明白为什么。

从 Kafka 0.10.1.0 开始,所有聚合运算符都使用内部重复数据删除缓存来减少结果 KTable 变更日志流的负载。例如,如果您直接计数并处理两条具有相同键的记录,则完整的变更日志流将为 <key:1>, <key:2>.

使用新的缓存功能,缓存会接收 <key:1> 并存储它,但不会立即将其发送到下游。当计算 <key:2> 时,它会替换缓存的第一个条目。根据缓存大小、不同键的数量、吞吐量和您的提交间隔,缓存向下游发送条目。这发生在单个键条目的缓存逐出或完全刷新缓存(向下游发送所有条目)时。因此,KTable 更新日志可能只显示 <key:2>(因为 <key:1> 被删除了)。

您可以通过 Streams 配置参数 StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 控制缓存的大小。如果将该值设置为零,则完全禁用缓存并且 KTable 更改日志将包含所有更新(有效地提供 0.10.0.0 之前的行为)。

Confluent 文档中有一节更详细地解释了缓存: