kafkaproducer/consumer重启后消费者收不到消息

Consumer does not receive messages after kafka producer/consumer restart

我们有一个生产者和一个消费者和一个分区。 consumer/producer 都是 spring 启动应用程序。消费者应用程序在我的本地计算机上运行,​​而生产者与远程计算机上的 kafka 和动物园管理员一起运行。

在开发过程中,我重新部署了生产者应用程序并进行了一些更改。但在那之后我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。可能是什么问题and/or如何解决?

消费者配置:

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        input:
          destination: sales
          content-type: application/json
      kafka:
        binder:
          brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          defaultZkPort: 2181
          defaultBrokerPort: 9092
server:
  port: 0

生产者配置:

cloud:
stream:
  defaultBinder: kafka
  bindings:
    output:
      destination: sales
      content-type: application/json
  kafka:
    binder:
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      defaultZkPort: 2181
      defaultBrokerPort: 9092

EDIT2:

消费者应用程序在 5 分钟后死亡,出现以下异常:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255  WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256  INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel

看看上面关于 DEBUG 的建议是否揭示了任何进一步的信息。看起来您从 KafkaTopicProvisioner 获得了一些超时异常。但是,当您重新启动我假设的消费者时,就会发生这种情况。看起来消费者在与经纪人沟通时遇到了一些问题,您需要找出那里发生了什么。

嗯,看起来已经有一个 bug 报告 spring-cloud-stream-binder-kafka 说明 resetOffset 属性 没有效果。因此,消费者总是请求偏移量为 latest.

的消息

如 git 问题所述,唯一的解决方法是通过 kafka 消费者 CLI 工具解决此问题。