会话超时时的 Kafka Listener 回滚事务
Kafka Listener rollback transaction on session timeout
我正在使用 spring-kafka
版本 1.1.3
来使用主题中的消息。在消费者配置中,自动提交设置为 true
,max.poll.records
设置为 10
。 session.timeout.ms
与服务器协商为 10 秒。
收到消息后,我将其中的一部分保存到数据库中。我的数据库有时会很慢,这会导致 kafka 监听器的会话超时:
Auto offset commit failed for group mygroup: Commit cannot be completed
since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to
poll() was longer than the configured session.timeout.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session
timeout or by reducing the maximum size of batches returned in poll()
with max.poll.records.
由于我无法增加服务器上的会话超时并且 max.poll.records
已经下降到 10,我希望能够将我的数据库调用包装在一个事务中,该事务将回滚kafka 会话超时的情况。
这可能吗?我该如何实现?
很遗憾,我无法在文档中找到解决方案。
你得考虑升级到SpringKafka1.2
和Kafka0.10.x
。旧的 Apache Kafka 在心跳方面存在缺陷。因此,使用 autoCommit
和缓慢的侦听器,您最终会遇到意外的重新平衡,并且您自己会遇到这样的问题。您使用的 Spring Kafka 版本的逻辑如下:
// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
if (this.autoCommit) {
invokeListener(records);
}
else {
if (sendToListener(records)) {
if (this.assignedPartitions != null) {
// avoid group management rebalance due to a slow
// consumer
this.consumer.pause(this.assignedPartitions);
this.paused = true;
this.unsent = records;
}
}
}
因此,您可以考虑关闭 autoCommit
并依赖默认打开的内置 pause
功能。
决定升级到 Kafka 0.11,因为它添加了事务支持(参见 Release Notes)。
我正在使用 spring-kafka
版本 1.1.3
来使用主题中的消息。在消费者配置中,自动提交设置为 true
,max.poll.records
设置为 10
。 session.timeout.ms
与服务器协商为 10 秒。
收到消息后,我将其中的一部分保存到数据库中。我的数据库有时会很慢,这会导致 kafka 监听器的会话超时:
Auto offset commit failed for group mygroup: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
由于我无法增加服务器上的会话超时并且 max.poll.records
已经下降到 10,我希望能够将我的数据库调用包装在一个事务中,该事务将回滚kafka 会话超时的情况。
这可能吗?我该如何实现?
很遗憾,我无法在文档中找到解决方案。
你得考虑升级到SpringKafka1.2
和Kafka0.10.x
。旧的 Apache Kafka 在心跳方面存在缺陷。因此,使用 autoCommit
和缓慢的侦听器,您最终会遇到意外的重新平衡,并且您自己会遇到这样的问题。您使用的 Spring Kafka 版本的逻辑如下:
// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
if (this.autoCommit) {
invokeListener(records);
}
else {
if (sendToListener(records)) {
if (this.assignedPartitions != null) {
// avoid group management rebalance due to a slow
// consumer
this.consumer.pause(this.assignedPartitions);
this.paused = true;
this.unsent = records;
}
}
}
因此,您可以考虑关闭 autoCommit
并依赖默认打开的内置 pause
功能。
决定升级到 Kafka 0.11,因为它添加了事务支持(参见 Release Notes)。