卡夫卡 INVALID_FETCH_SESSION_EPOCH

Kafka INVALID_FETCH_SESSION_EPOCH

我们正在使用带有 kafka 流应用程序的 kafka 代理设置,该应用程序 运行s 使用 Spring 云流 kafka。虽然看起来 运行 没问题,但我们确实在日志中收到以下错误语句:

2019-02-21 22:37:20,253 INFO kafka-coordinator-heartbeat-thread | anomaly-timeline org.apache.kafka.clients.FetchSessionHandler [Consumer clientId=anomaly-timeline-56dc4481-3086-4359-a8e8-d2dae12272a2-StreamThread-1-consumer, groupId=anomaly-timeline] Node 2 was unable to process the fetch request with (sessionId=1290440723, epoch=2089): INVALID_FETCH_SESSION_EPOCH. 

我在网上搜索过,但关于这个错误的信息不多。我猜这可能与代理和消费者之间的时间设置不同有关,但两台机器具有相同的时间服务器设置。

知道如何解决这个问题吗?

从 1.1.0 版本开始在 KIP-227 中引入了获取会话的概念:https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability

作为副本追随者的 Kafka 代理从领导者那里获取消息。为了避免每次为所有分区发送完整的元数据,只有那些发生变化的分区才会在同一个提取会话中发送。

当我们查看 Kafka 的代码时,我们可以看到一个示例,返回时:

if (session.epoch != expectedEpoch) {
        info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " +
          s"got ${session.epoch}.  Possible duplicate request.")
        new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new FetchSession.RESP_MAP, 0, session.id)
      } else {

来源:https://github.com/axbaretto/kafka/blob/ab2212c45daa841c2f16e9b1697187eb0e3aec8c/core/src/main/scala/kafka/server/FetchSession.scala#L493

一般来说,如果您没有数千个分区,同时这种情况也不会经常发生,那么您不必担心。

确实,正如zen在评论中指出的那样,当滚动或retention-based删除发生时,您会收到此消息。如果它不一直发生,这不是问题。如果是,请检查您的 log.rolllog.retention 配置。

这似乎是由 Kafka-8052 问题引起的,该问题已针对 Kafka 2.3.0

修复

在我们的案例中,根本原因是 kafka Broker - 客户端不兼容。如果您的集群落后于客户端版本,您可能会看到诸如此类的各种奇怪问题。

我们的 kafka broker 在 1.x.x 上,我们的 kafka-consumer 在 2.x.x 上。一旦我们将 spring-cloud-dependencies 降级为 Finchley.RELEASE,我们的问题就解决了。

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Finchley.RELEASE"
    }
}

正在将客户端版本更新到 2.3(来自代理的相同版本)为我修复它。