Spring Cloud Dataflow 中的 Zookeeper 不断断开连接
Zookeeper in Spring Cloud Dataflow keep disconnecting
我正在尝试使用 docker 启动 SCDF。它开始正常,但是当尝试部署任何流时,我在应用程序日志中收到以下错误
2017-03-09 15:19:45.864 INFO 62 --- [ main] kafka.utils.VerifiableProperties : Verifying properties
2017-03-09 15:19:45.864 INFO 62 --- [ main] kafka.utils.VerifiableProperties : Property client.id is overridden to groupid
2017-03-09 15:19:45.866 INFO 62 --- [ main] kafka.utils.VerifiableProperties : Property metadata.broker.list is overridden to kafka:9092
2017-03-09 15:19:45.867 INFO 62 --- [ main] kafka.utils.VerifiableProperties : Property request.timeout.ms is overridden to 10000
2017-03-09 15:19:45.868 INFO 62 --- [ main] kafka.client.ClientUtils$ : Fetching metadata from broker id:0,host:kafka,port:9092 with correlation id 0 for 1 topic(s) Set(test1.time)
2017-03-09 15:19:45.868 INFO 62 --- [ main] kafka.producer.SyncProducer : Connected to kafka:9092 for producing
2017-03-09 15:19:45.872 INFO 62 --- [ main] kafka.producer.SyncProducer : Disconnecting from kafka:9092
2017-03-09 15:19:45.889 ERROR 62 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Cannot initialize Binder
java.lang.IllegalArgumentException: Broker cannot be null
at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.integration.kafka.core.BrokerAddress.<init>(BrokerAddress.java:48) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.MetadataCache.getLeader(MetadataCache.java:78) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:271) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:266) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at com.gs.collections.impl.block.procedure.MapCollectProcedure.value(MapCollectProcedure.java:51) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.internal.IteratorIterate.forEach(IteratorIterate.java:648) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.internal.IterableIterate.forEach(IterableIterate.java:481) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.forEach(Iterate.java:126) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.addToMap(Iterate.java:2493) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.toMap(Iterate.java:2467) ~[gs-collections-5.0.0.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory.getLeaders(DefaultConnectionFactory.java:98) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:417) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:405) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:296) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:121) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:184) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.ChannelBindingService.bindProducer(ChannelBindingService.java:113) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:206) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:852) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:766) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.createAndRefreshContext(SpringApplication.java:361) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1191) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1180) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.cloud.stream.app.time.source.kafka.TimeSourceKafkaApplication.main(TimeSourceKafkaApplication.java:29) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
2017-03-09 15:19:45.890 INFO 62 --- [-zookeeper:2181] org.I0Itec.zkclient.ZkEventThread : Terminate ZkClient event thread.
2017-03-09 15:19:45.892 INFO 62 --- [ main] org.apache.zookeeper.ZooKeeper : Session: 0x15ab3a17705000a closed
2017-03-09 15:19:45.893 INFO 62 --- [ain-EventThread] org.apache.zookeeper.ClientCnxn : EventThread shut down
2017-03-09 15:19:45.893 WARN 62 --- [ main] s.c.a.AnnotationConfigApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-03-09 15:19:45.897 INFO 62 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
2017-03-09 15:19:45.899 INFO 62 --- [ main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@3e47b405: startup date [Thu Mar 09 15:19:36 UTC 2017]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@7c24a24b
2017-03-09 15:19:45.911 ERROR 62 --- [ main] o.s.boot.SpringApplication : Application startup failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:852) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:766) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.createAndRefreshContext(SpringApplication.java:361) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1191) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1180) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.cloud.stream.app.time.source.kafka.TimeSourceKafkaApplication.main(TimeSourceKafkaApplication.java:29) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:425) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:296) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:121) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:184) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.ChannelBindingService.bindProducer(ChannelBindingService.java:113) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:206) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
... 18 common frames omitted
Caused by: java.lang.IllegalArgumentException: Broker cannot be null
at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.integration.kafka.core.BrokerAddress.<init>(BrokerAddress.java:48) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.MetadataCache.getLeader(MetadataCache.java:78) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:271) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:266) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at com.gs.collections.impl.block.procedure.MapCollectProcedure.value(MapCollectProcedure.java:51) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.internal.IteratorIterate.forEach(IteratorIterate.java:648) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.internal.IterableIterate.forEach(IterableIterate.java:481) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.forEach(Iterate.java:126) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.addToMap(Iterate.java:2493) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.toMap(Iterate.java:2467) ~[gs-collections-5.0.0.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory.getLeaders(DefaultConnectionFactory.java:98) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:417) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
... 25 common frames omitted
2017-03-09 15:19:45.915 INFO 62 --- [ main] .b.l.ClasspathLoggingApplicationListener : Application failed to start with classpath:...
我正在使用 spring-cloud-dataflow-server-local-1.1.4。在 application.properties
:
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.zookeeper.connect-string=zookeeper:2181
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=kafka:9092
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=zookeeper:2181
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow=15000
在随后的运行中我也得到
Caused by: kafka.common.KafkaException: fetching topic metadata for topics [Set(test2.transform)] from broker [List()] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) ~[kafka_2.10-0.8.2.2.jar!/:na]
...
您的应用程序属性看起来适合连接。您正在使用 kafka 和 zookeeper 作为特定主题的消息管道。该消息带有您需要在 docker-compose 文件(如果存在)或应用程序属性中配置的属性。
例如,如果您的目标是让服务充当消息的生产者。
消息应配置为主题和主题名称,如果有则包含组。
这也可能是由于不正确的代理设置导致应用程序无法读取您的主题。
以下是买卖场景的kafka主题声明
spring.cloud.stream.kafka.bindings.costPrice.destination:abc.company.costPrice
spring.cloud.stream.kafka.bindings.costPrice.content-type:application/json
spring.cloud.stream.kafka.bindings.sellPrice.destination:abc.company.sellPrice
spring.cloud.stream.kafka.bindings.sellPrice.content-type:application/json
spring.cloud.stream.kafka.bindings.sellPrice.group:sellPriceGroup
主题也需要配置。另请参阅网络端口是否能够获得 zookeeper 和 kafka 运行。同时添加 autoCreateTopics:true
选项。
我不知道您当前的设置,如果您在读取 application.properties 之前正确启动 zookeeper 和 kafka。您不妨将 zookeeper:2181 替换为 machine-hostname:2181 ,对于 kafka 也是如此。
(例如 pavan.network.co.in:2181)
希望对您有所帮助。
我不知道为什么,但是将 docker 图像 zookeeper
和 wurstmeister/kafka:latest
替换为 spotify/kafka
(包含 Kafka 和 Zookeeper)使整个设置工作正常。
我正在尝试使用 docker 启动 SCDF。它开始正常,但是当尝试部署任何流时,我在应用程序日志中收到以下错误
2017-03-09 15:19:45.864 INFO 62 --- [ main] kafka.utils.VerifiableProperties : Verifying properties
2017-03-09 15:19:45.864 INFO 62 --- [ main] kafka.utils.VerifiableProperties : Property client.id is overridden to groupid
2017-03-09 15:19:45.866 INFO 62 --- [ main] kafka.utils.VerifiableProperties : Property metadata.broker.list is overridden to kafka:9092
2017-03-09 15:19:45.867 INFO 62 --- [ main] kafka.utils.VerifiableProperties : Property request.timeout.ms is overridden to 10000
2017-03-09 15:19:45.868 INFO 62 --- [ main] kafka.client.ClientUtils$ : Fetching metadata from broker id:0,host:kafka,port:9092 with correlation id 0 for 1 topic(s) Set(test1.time)
2017-03-09 15:19:45.868 INFO 62 --- [ main] kafka.producer.SyncProducer : Connected to kafka:9092 for producing
2017-03-09 15:19:45.872 INFO 62 --- [ main] kafka.producer.SyncProducer : Disconnecting from kafka:9092
2017-03-09 15:19:45.889 ERROR 62 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Cannot initialize Binder
java.lang.IllegalArgumentException: Broker cannot be null
at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.integration.kafka.core.BrokerAddress.<init>(BrokerAddress.java:48) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.MetadataCache.getLeader(MetadataCache.java:78) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:271) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:266) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at com.gs.collections.impl.block.procedure.MapCollectProcedure.value(MapCollectProcedure.java:51) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.internal.IteratorIterate.forEach(IteratorIterate.java:648) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.internal.IterableIterate.forEach(IterableIterate.java:481) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.forEach(Iterate.java:126) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.addToMap(Iterate.java:2493) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.toMap(Iterate.java:2467) ~[gs-collections-5.0.0.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory.getLeaders(DefaultConnectionFactory.java:98) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:417) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:405) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:296) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:121) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:184) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.ChannelBindingService.bindProducer(ChannelBindingService.java:113) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:206) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57) [spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:852) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541) [spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:766) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.createAndRefreshContext(SpringApplication.java:361) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1191) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1180) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.cloud.stream.app.time.source.kafka.TimeSourceKafkaApplication.main(TimeSourceKafkaApplication.java:29) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
2017-03-09 15:19:45.890 INFO 62 --- [-zookeeper:2181] org.I0Itec.zkclient.ZkEventThread : Terminate ZkClient event thread.
2017-03-09 15:19:45.892 INFO 62 --- [ main] org.apache.zookeeper.ZooKeeper : Session: 0x15ab3a17705000a closed
2017-03-09 15:19:45.893 INFO 62 --- [ain-EventThread] org.apache.zookeeper.ClientCnxn : EventThread shut down
2017-03-09 15:19:45.893 WARN 62 --- [ main] s.c.a.AnnotationConfigApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-03-09 15:19:45.897 INFO 62 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
2017-03-09 15:19:45.899 INFO 62 --- [ main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@3e47b405: startup date [Thu Mar 09 15:19:36 UTC 2017]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@7c24a24b
2017-03-09 15:19:45.911 ERROR 62 --- [ main] o.s.boot.SpringApplication : Application startup failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:852) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:766) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.createAndRefreshContext(SpringApplication.java:361) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1191) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1180) [spring-boot-1.3.5.RELEASE.jar!/:1.3.5.RELEASE]
at org.springframework.cloud.stream.app.time.source.kafka.TimeSourceKafkaApplication.main(TimeSourceKafkaApplication.java:29) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54) [time-source-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:425) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:296) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doBindProducer(KafkaMessageChannelBinder.java:121) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:184) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.ChannelBindingService.bindProducer(ChannelBindingService.java:113) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:206) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
... 18 common frames omitted
Caused by: java.lang.IllegalArgumentException: Broker cannot be null
at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.6.RELEASE.jar!/:4.2.6.RELEASE]
at org.springframework.integration.kafka.core.BrokerAddress.<init>(BrokerAddress.java:48) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.MetadataCache.getLeader(MetadataCache.java:78) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:271) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory$GetBrokersByPartitionFunction.valueOf(DefaultConnectionFactory.java:266) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at com.gs.collections.impl.block.procedure.MapCollectProcedure.value(MapCollectProcedure.java:51) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.internal.IteratorIterate.forEach(IteratorIterate.java:648) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.internal.IterableIterate.forEach(IterableIterate.java:481) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.forEach(Iterate.java:126) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.addToMap(Iterate.java:2493) ~[gs-collections-5.0.0.jar!/:na]
at com.gs.collections.impl.utility.Iterate.toMap(Iterate.java:2467) ~[gs-collections-5.0.0.jar!/:na]
at org.springframework.integration.kafka.core.DefaultConnectionFactory.getLeaders(DefaultConnectionFactory.java:98) ~[spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:417) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) ~[spring-retry-1.1.2.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureTopicCreated(KafkaMessageChannelBinder.java:405) ~[spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
... 25 common frames omitted
2017-03-09 15:19:45.915 INFO 62 --- [ main] .b.l.ClasspathLoggingApplicationListener : Application failed to start with classpath:...
我正在使用 spring-cloud-dataflow-server-local-1.1.4。在 application.properties
:
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.zookeeper.connect-string=zookeeper:2181
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=kafka:9092
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=zookeeper:2181
spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow=15000
在随后的运行中我也得到
Caused by: kafka.common.KafkaException: fetching topic metadata for topics [Set(test2.transform)] from broker [List()] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) ~[kafka_2.10-0.8.2.2.jar!/:na]
...
您的应用程序属性看起来适合连接。您正在使用 kafka 和 zookeeper 作为特定主题的消息管道。该消息带有您需要在 docker-compose 文件(如果存在)或应用程序属性中配置的属性。
例如,如果您的目标是让服务充当消息的生产者。 消息应配置为主题和主题名称,如果有则包含组。 这也可能是由于不正确的代理设置导致应用程序无法读取您的主题。
以下是买卖场景的kafka主题声明
spring.cloud.stream.kafka.bindings.costPrice.destination:abc.company.costPrice
spring.cloud.stream.kafka.bindings.costPrice.content-type:application/json
spring.cloud.stream.kafka.bindings.sellPrice.destination:abc.company.sellPrice
spring.cloud.stream.kafka.bindings.sellPrice.content-type:application/json
spring.cloud.stream.kafka.bindings.sellPrice.group:sellPriceGroup
主题也需要配置。另请参阅网络端口是否能够获得 zookeeper 和 kafka 运行。同时添加 autoCreateTopics:true
选项。
我不知道您当前的设置,如果您在读取 application.properties 之前正确启动 zookeeper 和 kafka。您不妨将 zookeeper:2181 替换为 machine-hostname:2181 ,对于 kafka 也是如此。 (例如 pavan.network.co.in:2181)
希望对您有所帮助。
我不知道为什么,但是将 docker 图像 zookeeper
和 wurstmeister/kafka:latest
替换为 spotify/kafka
(包含 Kafka 和 Zookeeper)使整个设置工作正常。