使用自动提交时消费者重新平衡
Consumer rebalance while using autocommit
我们使用具有以下配置的消费者 kafka 客户端 0.10.2.0:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
如您所见,我们正在使用自动提交。
我们使用的消费者 API 版本有一个专门的线程来执行自动提交。
所以每一秒我们都有一个自动提交,这意味着我们每一秒都有一个心跳。
我们的申请处理时间实际上可能需要(不时)超过 40 秒(请求超时间隔)
我想问的是:
1 - 如果处理时间需要,例如,一分钟。尽管每秒都有自动提交的 heartbean,但还会有重新平衡吗?
2 - 更奇怪的是,如果执行时间较长,我们似乎不止一次收到相同的消息。正常吗?如果消费者提交了一个偏移量,为什么重新平衡会再次使用相同的偏移量?
谢谢,
奥勒尔
澄清一下,每次轮询都会调用 AutoCommit 检查,它会检查经过的时间是否大于配置的时间,如果是,那么它只会执行提交
例如。如果提交间隔为 5 秒并且轮询在 7 秒内发生,在这种情况下,提交将在 7 秒后发生
针对您的问题
自动提交不计入心跳,如果处理时间很长,那么显然不会发生提交,并会导致会话超时,进而触发重新平衡
这不应该发生,除非你是seeking/resetting先前提交的偏移量的偏移量或消费者重新平衡发生
从Kafka v0.10.1.0开始,你不需要手动触发自动提交来做心跳。 Kafka消费者自己在后台发起一个新线程用于心跳机制。要了解更多信息,请阅读 KIP-62。
在您的情况下,您可以将 max.poll.interval.ms
设置为处理器处理 max.poll.record
条记录所花费的最长时间。
您可以使用 KafkaConsumer.pause()
/ KafkaConsumer.resume()
来防止消费者在长时间处理暂停期间重新平衡。 JavaDocs. Take a look at 个问题。
回复 2。您确定这些偏移量已提交吗?
我们使用具有以下配置的消费者 kafka 客户端 0.10.2.0:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
如您所见,我们正在使用自动提交。 我们使用的消费者 API 版本有一个专门的线程来执行自动提交。 所以每一秒我们都有一个自动提交,这意味着我们每一秒都有一个心跳。
我们的申请处理时间实际上可能需要(不时)超过 40 秒(请求超时间隔)
我想问的是:
1 - 如果处理时间需要,例如,一分钟。尽管每秒都有自动提交的 heartbean,但还会有重新平衡吗?
2 - 更奇怪的是,如果执行时间较长,我们似乎不止一次收到相同的消息。正常吗?如果消费者提交了一个偏移量,为什么重新平衡会再次使用相同的偏移量?
谢谢, 奥勒尔
澄清一下,每次轮询都会调用 AutoCommit 检查,它会检查经过的时间是否大于配置的时间,如果是,那么它只会执行提交
例如。如果提交间隔为 5 秒并且轮询在 7 秒内发生,在这种情况下,提交将在 7 秒后发生
针对您的问题
自动提交不计入心跳,如果处理时间很长,那么显然不会发生提交,并会导致会话超时,进而触发重新平衡
这不应该发生,除非你是seeking/resetting先前提交的偏移量的偏移量或消费者重新平衡发生
从Kafka v0.10.1.0开始,你不需要手动触发自动提交来做心跳。 Kafka消费者自己在后台发起一个新线程用于心跳机制。要了解更多信息,请阅读 KIP-62。
在您的情况下,您可以将 max.poll.interval.ms
设置为处理器处理 max.poll.record
条记录所花费的最长时间。
您可以使用 KafkaConsumer.pause()
/ KafkaConsumer.resume()
来防止消费者在长时间处理暂停期间重新平衡。 JavaDocs. Take a look at
回复 2。您确定这些偏移量已提交吗?