o.apache.kafka.common.network.Selector : I/O 错误 localhost/127.0.0.1
o.apache.kafka.common.network.Selector : Error in I/O with localhost/127.0.0.1
我的应用程序在一台机器上使用来自 Kafka 服务器 运行 的消息,然后将它们转发到其他实例上的另一个远程 Kafka 运行。在我将我的应用程序部署到 Cloud Foundry 并将消息发送到第一个 Kafka 服务器后,该应用程序按预期运行。消息被消费并将其转发到远程 Kafka。
然而在那之后我在 Cloud Foundry 中得到了异常的无限循环(在我的本地机器上也以较慢的速度):
堆栈跟踪:
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT 2016-06-03 18:20:34.900 WARN 29 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Error in I/O with localhost/127.0.0.1
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT java.net.ConnectException: Connection refused
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65-]
我的应用程序yaml文件是这样的
应用程序 YML:
spring:
cloud:
stream:
bindings:
activationMsgQueue:
binder: kafka1
destination: test
contentType: application/json
consumer:
resetOffsets: true
startOffset: latest
input:
binder: kafka2
content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo
destination: test
group: prac
binders:
kafka1:
type: kafka
environment:
spring:
kafka:
host: caapmsg-as-a1p.sys.comcast.net
kafka2:
type: kafka
environment:
spring:
kafka:
host: caapmsg-as-a3p.sys.comcast.net
default-binder: kafka2
kafka:
binder:
zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
我观察到如果我包含下面的配置,错误就会消失,但现在我有一个无限循环的消息被消费和发送。
片段:
kafka:
binder:
brokers: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
我需要做什么来停止这个无限循环?
您好 Marius,感谢您响应 SOS 呼叫。我对上述问题进行了改进。流程现在从 a1p(topic:test) 消费,如果消息有效则转发到 a3p(topic:test),否则将错误消息发送到 a1p(topic:errorMsgQueue)。我有以下应用程序。 yml 文件
spring:
云:
溪流:
绑定:
错误消息队列:
活页夹:kafka1
目的地:errorMsgQueue
内容类型:application/json
输入:
活页夹:kafka2
内容类型:application/x-java-object;类型=com.comcast.activation.message.vo.ActivationDataInfo
目的地:测试
群组:prac
激活消息队列:
活页夹:kafka3
目的地:测试
内容类型:application/json
活页夹:
卡夫卡1:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a1p.sys.comcast.net
zk 节点:caapmsg-as-a1p.sys.comcast.net
卡夫卡2:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a3p.sys.comcast.net
zk 节点:caapmsg-as-a3p.sys.comcast.net
卡夫卡3:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a1p.sys.comcast.net
zk 节点:caapmsg-as-a1p.sys.comcast.net
默认绑定器:kafka2
我仍然遇到无限循环。我做错了什么?
spring.kafka.host
不是 Spring Cloud Stream 的有效配置选项。 http://docs.spring.io/spring-cloud-stream/docs/1.0.0.RELEASE/reference/htmlsingle/index.html#_kafka_binder_properties 是活页夹支持的唯一属性。
此外,您的应用程序似乎混合了两个集群的配置。 (我假设它们是独立的集群?)
应该是这样的:
spring:
云:
溪流:
绑定:
激活消息队列:
活页夹:kafka1
目的地:测试
内容类型:application/json
消费者:
重置偏移量:真
startOffset:最新
输入:
活页夹:kafka2
内容类型:application/x-java-object;类型=com.comcast.activation.message.vo.ActivationDataInfo
目的地:测试
群组:prac<br>
活页夹:
卡夫卡1:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a1p.sys.comcast.net
zk 节点:caapmsg-as-a1p.sys.comcast.net
卡夫卡2:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a3p.sys.comcast.net
zk 节点:caapmsg-as-a3p.sys.comcast.net
默认绑定器:kafka2
有关详细信息,请参阅此示例 https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/multibinder-differentsystems/src/main/resources/application.yml。
我怀疑无限循环是由于向同一主题发送和接收消息而引起的。
我的应用程序在一台机器上使用来自 Kafka 服务器 运行 的消息,然后将它们转发到其他实例上的另一个远程 Kafka 运行。在我将我的应用程序部署到 Cloud Foundry 并将消息发送到第一个 Kafka 服务器后,该应用程序按预期运行。消息被消费并将其转发到远程 Kafka。
然而在那之后我在 Cloud Foundry 中得到了异常的无限循环(在我的本地机器上也以较慢的速度):
堆栈跟踪:
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT 2016-06-03 18:20:34.900 WARN 29 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Error in I/O with localhost/127.0.0.1
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT java.net.ConnectException: Connection refused
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65-]
我的应用程序yaml文件是这样的
应用程序 YML:
spring:
cloud:
stream:
bindings:
activationMsgQueue:
binder: kafka1
destination: test
contentType: application/json
consumer:
resetOffsets: true
startOffset: latest
input:
binder: kafka2
content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo
destination: test
group: prac
binders:
kafka1:
type: kafka
environment:
spring:
kafka:
host: caapmsg-as-a1p.sys.comcast.net
kafka2:
type: kafka
environment:
spring:
kafka:
host: caapmsg-as-a3p.sys.comcast.net
default-binder: kafka2
kafka:
binder:
zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
我观察到如果我包含下面的配置,错误就会消失,但现在我有一个无限循环的消息被消费和发送。
片段:
kafka:
binder:
brokers: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
我需要做什么来停止这个无限循环?
您好 Marius,感谢您响应 SOS 呼叫。我对上述问题进行了改进。流程现在从 a1p(topic:test) 消费,如果消息有效则转发到 a3p(topic:test),否则将错误消息发送到 a1p(topic:errorMsgQueue)。我有以下应用程序。 yml 文件
spring:
云:
溪流:
绑定:
错误消息队列:
活页夹:kafka1
目的地:errorMsgQueue
内容类型:application/json
输入:
活页夹:kafka2
内容类型:application/x-java-object;类型=com.comcast.activation.message.vo.ActivationDataInfo
目的地:测试
群组:prac
激活消息队列:
活页夹:kafka3
目的地:测试
内容类型:application/json
活页夹:
卡夫卡1:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a1p.sys.comcast.net
zk 节点:caapmsg-as-a1p.sys.comcast.net
卡夫卡2:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a3p.sys.comcast.net
zk 节点:caapmsg-as-a3p.sys.comcast.net
卡夫卡3:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a1p.sys.comcast.net
zk 节点:caapmsg-as-a1p.sys.comcast.net
默认绑定器:kafka2
我仍然遇到无限循环。我做错了什么?
spring.kafka.host
不是 Spring Cloud Stream 的有效配置选项。 http://docs.spring.io/spring-cloud-stream/docs/1.0.0.RELEASE/reference/htmlsingle/index.html#_kafka_binder_properties 是活页夹支持的唯一属性。
此外,您的应用程序似乎混合了两个集群的配置。 (我假设它们是独立的集群?)
应该是这样的:
spring:
云:
溪流:
绑定:
激活消息队列:
活页夹:kafka1
目的地:测试
内容类型:application/json
消费者:
重置偏移量:真
startOffset:最新
输入:
活页夹:kafka2
内容类型:application/x-java-object;类型=com.comcast.activation.message.vo.ActivationDataInfo
目的地:测试
群组:prac<br>
活页夹:
卡夫卡1:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a1p.sys.comcast.net
zk 节点:caapmsg-as-a1p.sys.comcast.net
卡夫卡2:
类型:卡夫卡
环境:
spring:
云:
溪流:
卡夫卡:
活页夹:
经纪人:caapmsg-as-a3p.sys.comcast.net
zk 节点:caapmsg-as-a3p.sys.comcast.net
默认绑定器:kafka2
有关详细信息,请参阅此示例 https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/multibinder-differentsystems/src/main/resources/application.yml。
我怀疑无限循环是由于向同一主题发送和接收消息而引起的。