Kafka 流 - 第一个示例 WordCount 第一圈计数不正确
Kafka streams - First example WordCount doesn't count correctly the first lap
我正在研究 Kafka Streams,我对 Java 8 中的第一个 WordCount 示例有疑问,摘自文档。
使用最新可用版本的 kafka 流、Kafka Connect 和 WordCount lambda 表达式示例。
我遵循以下步骤:
我在 Kafka 中创建了一个输入主题和一个输出主题。启动应用程序流式传输,然后通过从 .txt 文件中插入一些词来上传输入主题
第一次计数时,在输出主题中我看到正确分组的单词,但计数是错误的。如果我尝试重新插入相同的单词,则之前错误计数的连续计数都是正确的。
如果我使用消费者控制台查看输入主题转储,它已正确加载并且没有脏数据。
怎么第一次数错了?
示例[第一个数据]:
(在Kafka中输入Topic)
嗨嗨
迈克迈克
测试
(应用流是 运行)
(output Topic) hi 12 mike 4 test 3 (causal counts)
[连续数据 - 在输入主题中发布相同的词]
(输出主题) hi 14 mike 6 test 4
[新尝试]
(输出主题) hi 16 mike 8 test 5
等等....
Apache Kafka 中的 WordCount 演示有 the following lines:
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
这意味着,当您重新启动应用程序时,它将从头开始读取其输入主题 ("earliest") iff 没有现有的消费者偏移量存储在 Kafka 中的 WordCount 应用程序。在一定数量的应用不活动后,应用的消费者补偿在 Kafka 中过期,默认值为 24 小时(参见 offsets.retention.minutes
broker configuration)。
我可以想象发生了以下情况:
- 您之前曾尝试过 Kafka,并将测试数据输入到输入主题中。
- 然后您在恢复实验之前休息了 24 小时以上。
- 现在,应用程序在重新启动时会恢复为从头开始重新读取输入主题,从而获取较旧的测试输入数据,从而导致 "inflated" 计数。
If I Looking the input topic dump with a consumer console, it's loaded properly and there are no dirty data.
您可以在添加 CLI 选项 --from-beginning
(参见 https://kafka.apache.org/documentation/#quickstart_consume)的同时再次查看控制台用户的输入主题来验证我的上述假设。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning
这将显示主题 "yourInputTopic" 中的所有可用数据——减去可能同时从 Kafka 主题中清除的任何数据(默认代理配置将清除早于7 天,参见 log.retention.hours
).
我正在研究 Kafka Streams,我对 Java 8 中的第一个 WordCount 示例有疑问,摘自文档。
使用最新可用版本的 kafka 流、Kafka Connect 和 WordCount lambda 表达式示例。
我遵循以下步骤: 我在 Kafka 中创建了一个输入主题和一个输出主题。启动应用程序流式传输,然后通过从 .txt 文件中插入一些词来上传输入主题
第一次计数时,在输出主题中我看到正确分组的单词,但计数是错误的。如果我尝试重新插入相同的单词,则之前错误计数的连续计数都是正确的。
如果我使用消费者控制台查看输入主题转储,它已正确加载并且没有脏数据。
怎么第一次数错了?
示例[第一个数据]: (在Kafka中输入Topic) 嗨嗨 迈克迈克 测试
(应用流是 运行)
(output Topic) hi 12 mike 4 test 3 (causal counts)
[连续数据 - 在输入主题中发布相同的词]
(输出主题) hi 14 mike 6 test 4
[新尝试]
(输出主题) hi 16 mike 8 test 5
等等....
Apache Kafka 中的 WordCount 演示有 the following lines:
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
这意味着,当您重新启动应用程序时,它将从头开始读取其输入主题 ("earliest") iff 没有现有的消费者偏移量存储在 Kafka 中的 WordCount 应用程序。在一定数量的应用不活动后,应用的消费者补偿在 Kafka 中过期,默认值为 24 小时(参见 offsets.retention.minutes
broker configuration)。
我可以想象发生了以下情况:
- 您之前曾尝试过 Kafka,并将测试数据输入到输入主题中。
- 然后您在恢复实验之前休息了 24 小时以上。
- 现在,应用程序在重新启动时会恢复为从头开始重新读取输入主题,从而获取较旧的测试输入数据,从而导致 "inflated" 计数。
If I Looking the input topic dump with a consumer console, it's loaded properly and there are no dirty data.
您可以在添加 CLI 选项 --from-beginning
(参见 https://kafka.apache.org/documentation/#quickstart_consume)的同时再次查看控制台用户的输入主题来验证我的上述假设。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning
这将显示主题 "yourInputTopic" 中的所有可用数据——减去可能同时从 Kafka 主题中清除的任何数据(默认代理配置将清除早于7 天,参见 log.retention.hours
).