为什么我看不到 Kafka Streams reduce 方法的任何输出?
Why don't I see any output from the Kafka Streams reduce method?
给定以下代码:
KStream<String, Custom> stream =
builder.stream(Serdes.String(), customSerde, "test_in");
stream
.groupByKey(Serdes.String(), customSerde)
.reduce(new CustomReducer(), "reduction_state")
.print(Serdes.String(), customSerde);
我在 Reducer 的 apply 方法中有一个 println
语句,当我期望减少发生时它会成功打印出来。但是,上面显示的最终打印语句什么也不显示。同样,如果我使用 to
方法而不是 print
,我在目标主题中看不到任何消息。
在reduce 语句之后我需要什么才能看到减少的结果?如果一个值被推送到输入,我不希望看到任何东西。如果推送具有相同键的第二个值,我希望 reducer 应用(它确实如此)并且我还希望减少的结果继续到处理管道中的下一步。如上所述,我在管道的后续步骤中没有看到任何东西,我不明白为什么。
从 Kafka 0.10.1.0
开始,所有聚合运算符都使用内部重复数据删除缓存来减少结果 KTable 变更日志流的负载。例如,如果您直接计数并处理两条具有相同键的记录,则完整的变更日志流将为 <key:1>, <key:2>
.
使用新的缓存功能,缓存会接收 <key:1>
并存储它,但不会立即将其发送到下游。当计算 <key:2>
时,它会替换缓存的第一个条目。根据缓存大小、不同键的数量、吞吐量和您的提交间隔,缓存向下游发送条目。这发生在单个键条目的缓存逐出或完全刷新缓存(向下游发送所有条目)时。因此,KTable 更新日志可能只显示 <key:2>
(因为 <key:1>
被删除了)。
您可以通过 Streams 配置参数 StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
控制缓存的大小。如果将该值设置为零,则完全禁用缓存并且 KTable 更改日志将包含所有更新(有效地提供 0.10.0.0
之前的行为)。
Confluent 文档中有一节更详细地解释了缓存:
给定以下代码:
KStream<String, Custom> stream =
builder.stream(Serdes.String(), customSerde, "test_in");
stream
.groupByKey(Serdes.String(), customSerde)
.reduce(new CustomReducer(), "reduction_state")
.print(Serdes.String(), customSerde);
我在 Reducer 的 apply 方法中有一个 println
语句,当我期望减少发生时它会成功打印出来。但是,上面显示的最终打印语句什么也不显示。同样,如果我使用 to
方法而不是 print
,我在目标主题中看不到任何消息。
在reduce 语句之后我需要什么才能看到减少的结果?如果一个值被推送到输入,我不希望看到任何东西。如果推送具有相同键的第二个值,我希望 reducer 应用(它确实如此)并且我还希望减少的结果继续到处理管道中的下一步。如上所述,我在管道的后续步骤中没有看到任何东西,我不明白为什么。
从 Kafka 0.10.1.0
开始,所有聚合运算符都使用内部重复数据删除缓存来减少结果 KTable 变更日志流的负载。例如,如果您直接计数并处理两条具有相同键的记录,则完整的变更日志流将为 <key:1>, <key:2>
.
使用新的缓存功能,缓存会接收 <key:1>
并存储它,但不会立即将其发送到下游。当计算 <key:2>
时,它会替换缓存的第一个条目。根据缓存大小、不同键的数量、吞吐量和您的提交间隔,缓存向下游发送条目。这发生在单个键条目的缓存逐出或完全刷新缓存(向下游发送所有条目)时。因此,KTable 更新日志可能只显示 <key:2>
(因为 <key:1>
被删除了)。
您可以通过 Streams 配置参数 StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
控制缓存的大小。如果将该值设置为零,则完全禁用缓存并且 KTable 更改日志将包含所有更新(有效地提供 0.10.0.0
之前的行为)。
Confluent 文档中有一节更详细地解释了缓存: