kafkaproducer/consumer重启后消费者收不到消息
Consumer does not receive messages after kafka producer/consumer restart
我们有一个生产者和一个消费者和一个分区。 consumer/producer 都是 spring 启动应用程序。消费者应用程序在我的本地计算机上运行,而生产者与远程计算机上的 kafka 和动物园管理员一起运行。
在开发过程中,我重新部署了生产者应用程序并进行了一些更改。但在那之后我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。可能是什么问题and/or如何解决?
消费者配置:
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
input:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
server:
port: 0
生产者配置:
cloud:
stream:
defaultBinder: kafka
bindings:
output:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
EDIT2:
消费者应用程序在 5 分钟后死亡,出现以下异常:
2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel
看看上面关于 DEBUG 的建议是否揭示了任何进一步的信息。看起来您从 KafkaTopicProvisioner 获得了一些超时异常。但是,当您重新启动我假设的消费者时,就会发生这种情况。看起来消费者在与经纪人沟通时遇到了一些问题,您需要找出那里发生了什么。
嗯,看起来已经有一个 bug 报告 spring-cloud-stream-binder-kafka
说明 resetOffset
属性 没有效果。因此,消费者总是请求偏移量为 latest
.
的消息
如 git 问题所述,唯一的解决方法是通过 kafka 消费者 CLI 工具解决此问题。
我们有一个生产者和一个消费者和一个分区。 consumer/producer 都是 spring 启动应用程序。消费者应用程序在我的本地计算机上运行,而生产者与远程计算机上的 kafka 和动物园管理员一起运行。
在开发过程中,我重新部署了生产者应用程序并进行了一些更改。但在那之后我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。可能是什么问题and/or如何解决?
消费者配置:
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
input:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
server:
port: 0
生产者配置:
cloud:
stream:
defaultBinder: kafka
bindings:
output:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
EDIT2:
消费者应用程序在 5 分钟后死亡,出现以下异常:
2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel
看看上面关于 DEBUG 的建议是否揭示了任何进一步的信息。看起来您从 KafkaTopicProvisioner 获得了一些超时异常。但是,当您重新启动我假设的消费者时,就会发生这种情况。看起来消费者在与经纪人沟通时遇到了一些问题,您需要找出那里发生了什么。
嗯,看起来已经有一个 bug 报告 spring-cloud-stream-binder-kafka
说明 resetOffset
属性 没有效果。因此,消费者总是请求偏移量为 latest
.
如 git 问题所述,唯一的解决方法是通过 kafka 消费者 CLI 工具解决此问题。