Kafka 流 - 第一个示例 WordCount 第一圈计数不正确

Kafka streams - First example WordCount doesn't count correctly the first lap

我正在研究 Kafka Streams,我对 Java 8 中的第一个 WordCount 示例有疑问,摘自文档。

使用最新可用版本的 kafka 流、Kafka Connect 和 WordCount lambda 表达式示例。

我遵循以下步骤: 我在 Kafka 中创建了一个输入主题和一个输出主题。启动应用程序流式传输,然后通过从 .txt 文件中插入一些词来上传输入主题

第一次计数时,在输出主题中我看到正确分组的单词,但计数是错误的。如果我尝试重新插入相同的单词,则之前错误计数的连续计数都是正确的。

如果我使用消费者控制台查看输入主题转储,它已正确加载并且没有脏数据。

怎么第一次数错了?

示例[第一个数据]: (在Kafka中输入Topic) 嗨嗨 迈克迈克 测试

(应用流是 运行)

(output Topic) hi 12 mike 4 test 3 (causal counts)

[连续数据 - 在输入主题中发布相同的词]

(输出主题) hi 14 mike 6 test 4

[新尝试]

(输出主题) hi 16 mike 8 test 5

等等....

Apache Kafka 中的 WordCount 演示有 the following lines:

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

这意味着,当您重新启动应用程序时,它将从头开始读取其输入主题 ("earliest") iff​​ 没有现有的消费者偏移量存储在 Kafka 中的 WordCount 应用程序。在一定数量的应用不活动后,应用的消费者补偿在 Kafka 中过期,默认值为 24 小时(参见 offsets.retention.minutes broker configuration)。

我可以想象发生了以下情况:

  • 您之前曾尝试过 Kafka,并将测试数据输入到输入主题中。
  • 然后您在恢复实验之前休息了 24 小时以上。
  • 现在,应用程序在重新启动时会恢复为从头开始重新读取输入主题,从而获取较旧的测试输入数据,从而导致 "inflated" 计数。

If I Looking the input topic dump with a consumer console, it's loaded properly and there are no dirty data.

您可以在添加 CLI 选项 --from-beginning(参见 https://kafka.apache.org/documentation/#quickstart_consume)的同时再次查看控制台用户的输入主题来验证我的上述假设。

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning

这将显示主题 "yourInputTopic" 中的所有可用数据——减去可能同时从 Kafka 主题中清除的任何数据(默认代理配置将清除早于7 天,参见 log.retention.hours).