会话超时时的 Kafka Listener 回滚事务

Kafka Listener rollback transaction on session timeout

我正在使用 spring-kafka 版本 1.1.3 来使用主题中的消息。在消费者配置中,自动提交设置为 truemax.poll.records 设置为 10session.timeout.ms 与服务器协商为 10 秒。

收到消息后,我将其中的一部分保存到数据库中。我的数据库有时会很慢,这会导致 kafka 监听器的会话超时:

Auto offset commit failed for group mygroup: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

由于我无法增加服务器上的会话超时并且 max.poll.records 已经下降到 10,我希望能够将我的数据库调用包装在一个事务中,该事务将回滚kafka 会话超时的情况。

这可能吗?我该如何实现?

很遗憾,我无法在文档中找到解决方案。

你得考虑升级到SpringKafka1.2和Kafka0.10.x。旧的 Apache Kafka 在心跳方面存在缺陷。因此,使用 autoCommit 和缓慢的侦听器,您最终会遇到意外的重新平衡,并且您自己会遇到这样的问题。您使用的 Spring Kafka 版本的逻辑如下:

// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
if (this.autoCommit) {
    invokeListener(records);
}
else {
    if (sendToListener(records)) {
        if (this.assignedPartitions != null) {
            // avoid group management rebalance due to a slow
            // consumer
            this.consumer.pause(this.assignedPartitions);
            this.paused = true;
            this.unsent = records;
        }
    }
}

因此,您可以考虑关闭 autoCommit 并依赖默认打开的内置 pause 功能。

决定升级到 Kafka 0.11,因为它添加了事务支持(参见 Release Notes)。