Kafka Stream 和 Consumer Group 怪异行为
Kafka Stream and Consumer Group Weird Behavior
我有两个高级问题被分解成更多单独的问题,这两个高级问题都涉及 Apache Kafka Streams API 正在创建和使用的消费者群体。
首先是kafka-consumer-group.sh
脚本的输出。我得到奇怪的输出,虽然他们似乎连接到特定的 group/topic/partition:
,但并没有真正告诉我特定消费者的位置
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
STANDARD_DATA 9 11 11 0 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer-4fd9dc15-d8a7-4598-85a9-3761ae6a747b/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer
STANDARD_DATA 0 4 11 7 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer-28e1c7bf-860d-44d6-bf58-5e0ff875587c/1.1.1.1 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer
STANDARD_DATA 4 - 10 - myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer
STANDARD_DATA 5 - 10 - myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer-a81f1399-1fc4-4579-b24f-fa8fee01fabf/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer
STANDARD_DATA 3 - 12 - myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer-6a83bfcc-2c6e-4e9d-a819-029ac8c6ae17/1.1.1.1 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer
STANDARD_DATA 8 12 12 0 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer-6d46bed3-70c4-4c7f-8e53-f9591192bc3f/1.1.1.1 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer
STANDARD_DATA 7 - 11 - myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer-5313315b-ded9-4fe7-ac9d-d8d5b20dd5b9/1.1.1.1 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer
STANDARD_DATA 2 10 10 0 myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer-c08a648f-548e-47a8-8bc5-7b6fa3bc1fb5/1.1.1.1 myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer
STANDARD_DATA 1 2 10 8 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer-08d99679-d430-4e9f-a3b9-11e558ca34a4/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer
STANDARD_DATA 6 - 12 - myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer-666040f8-d4d0-49e9-9db6-c6efee49ebe1/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer
- 为什么有些CURRENT-OFFSETS(第3列)和LAG(第4列)显示为'-',而我可以直接查询Kafka的API来区分它们实际上是赶上了吗?
(通过golang查询API)
4 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3 OFFSET: 10 LOG-END: 10 LAG: 0
- 此外,为什么一般情况下该偏移量不会如日志中所示显示(又名,它应该被赶上)?
我的第二个高级问题是关于流的问题。我们有一个流进程在随机工作(主要是在重启期间),重置为特定主题中可用的最早偏移量。在整个代码中没有 'reset',OFFSET_RESET 也没有被触及。我还可以确认我们没有使用 'exactly-once',所以我不确定这些偏移重置究竟在哪里发挥作用。
再一次,它基本上是:
流进程正在搅动数据,发生了一些事情~然后我们的偏移量回到地面 0,再次处理。在它也决定重置之前,这可能会持续数天到数周,因此正在提交偏移量。
关于 kafka-consumer-groups.sh
的输出:CURRENT-OFFSET 中的 -
表示该分区没有提交的偏移量。这意味着,也无法计算滞后(因此,您也在那里得到 -
)。
如果我没看错您的陈述,如果您使用 golang 查询偏移量,它会显示分区 4 位于偏移量 10,这与 kafka-consumer-groups.sh
显示的相反——不确定为什么会这样。 ..
关于重置偏移量:您可能需要增加代理配置 offsets.retention.minutes
-- 默认值为 24 小时(参见 https://docs.confluent.io/current/streams/faq.html#why-is-my-application-re-processing-data-from-the-beginning)。
另请注意,Streams API 使用默认重置策略 "earliest"(与默认使用 "latest" 的 Consumer API 形成对比)。您可以通过 StreamsConfig
在 Streams API 中更改重置策略:https://docs.confluent.io/current/streams/developer-guide.html#non-streams-configuration-parameters
我有两个高级问题被分解成更多单独的问题,这两个高级问题都涉及 Apache Kafka Streams API 正在创建和使用的消费者群体。
首先是kafka-consumer-group.sh
脚本的输出。我得到奇怪的输出,虽然他们似乎连接到特定的 group/topic/partition:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
STANDARD_DATA 9 11 11 0 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer-4fd9dc15-d8a7-4598-85a9-3761ae6a747b/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer
STANDARD_DATA 0 4 11 7 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer-28e1c7bf-860d-44d6-bf58-5e0ff875587c/1.1.1.1 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer
STANDARD_DATA 4 - 10 - myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer
STANDARD_DATA 5 - 10 - myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer-a81f1399-1fc4-4579-b24f-fa8fee01fabf/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer
STANDARD_DATA 3 - 12 - myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer-6a83bfcc-2c6e-4e9d-a819-029ac8c6ae17/1.1.1.1 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer
STANDARD_DATA 8 12 12 0 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer-6d46bed3-70c4-4c7f-8e53-f9591192bc3f/1.1.1.1 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer
STANDARD_DATA 7 - 11 - myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer-5313315b-ded9-4fe7-ac9d-d8d5b20dd5b9/1.1.1.1 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer
STANDARD_DATA 2 10 10 0 myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer-c08a648f-548e-47a8-8bc5-7b6fa3bc1fb5/1.1.1.1 myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer
STANDARD_DATA 1 2 10 8 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer-08d99679-d430-4e9f-a3b9-11e558ca34a4/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer
STANDARD_DATA 6 - 12 - myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer-666040f8-d4d0-49e9-9db6-c6efee49ebe1/1.1.1.1 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer
- 为什么有些CURRENT-OFFSETS(第3列)和LAG(第4列)显示为'-',而我可以直接查询Kafka的API来区分它们实际上是赶上了吗?
(通过golang查询API)
4 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3 OFFSET: 10 LOG-END: 10 LAG: 0
- 此外,为什么一般情况下该偏移量不会如日志中所示显示(又名,它应该被赶上)?
我的第二个高级问题是关于流的问题。我们有一个流进程在随机工作(主要是在重启期间),重置为特定主题中可用的最早偏移量。在整个代码中没有 'reset',OFFSET_RESET 也没有被触及。我还可以确认我们没有使用 'exactly-once',所以我不确定这些偏移重置究竟在哪里发挥作用。
再一次,它基本上是:
流进程正在搅动数据,发生了一些事情~然后我们的偏移量回到地面 0,再次处理。在它也决定重置之前,这可能会持续数天到数周,因此正在提交偏移量。
关于 kafka-consumer-groups.sh
的输出:CURRENT-OFFSET 中的 -
表示该分区没有提交的偏移量。这意味着,也无法计算滞后(因此,您也在那里得到 -
)。
如果我没看错您的陈述,如果您使用 golang 查询偏移量,它会显示分区 4 位于偏移量 10,这与 kafka-consumer-groups.sh
显示的相反——不确定为什么会这样。 ..
关于重置偏移量:您可能需要增加代理配置 offsets.retention.minutes
-- 默认值为 24 小时(参见 https://docs.confluent.io/current/streams/faq.html#why-is-my-application-re-processing-data-from-the-beginning)。
另请注意,Streams API 使用默认重置策略 "earliest"(与默认使用 "latest" 的 Consumer API 形成对比)。您可以通过 StreamsConfig
在 Streams API 中更改重置策略:https://docs.confluent.io/current/streams/developer-guide.html#non-streams-configuration-parameters