使用出站适配器配置断路器以处理连接超时问题
Configuring Circuit breaker with outbound adapter for handling connection timed out issue
<int:service-activator input-channel="toKafka" ref="conditionalProducerService" method="producerCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice1" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel2" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
<int:poller fixed-delay="1000" error-channel="failedChannel2" />
</int-kafka:outbound-channel-adapter>
<int:chain input-channel="failedChannel2">
<int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");}
通过以上配置,我们正在尝试:
1.Expecting 让失败的消息传播到错误通道="failedChannel2" 而不是 happening.as 我在控制台中看不到转换后的输出。
2.CircuitBreaker 正在为 ServiceActivator 工作(如上所示与应用程序相关的异常)但是我们如何为出站适配器的失败情况配置 CB。示例:当连接超时或服务器突然关闭时 /network connection problem/some 将消息从 SI 通道发送到外部(kafka)server.Can 之前的环境问题,我们为这种情况配置了带有出站适配器的 CB。
根据有关断路器建议的 SI 文档,见下文。
“通常,此建议可能用于外部服务,其中可能需要一些时间才能失败(例如
作为尝试建立网络连接的超时)。
请建议如何实现this.Many谢谢。
更新配置:
<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />
<int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
<constructor-arg ref="producerContext"/>
</bean>
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />
<int:transformer input-channel="errorChannel"
order="1" ref="transformerService1" method="transformFailed">
</int:transformer>
public void transformFailed(Message<?> message) {
APPLOGGER.log("transformer message test" + message);
public class ProducerMessageHandler extends KafkaProducerMessageHandler{
public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
super(kafkaProducerContext);
// TODO Auto-generated constructor stub
}
@Override
public void handleMessageInternal(final Message<?> message) throws Exception {
//super.handleMessageInternal(message);
throw new RuntimeException("test foo");
}
日志:
01-05@23:44:18,598 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162- 3b93-9bb6-0699-89b15b20e904}]
调试:- com.XXX.ProducerMessageHandler#0 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904}]
出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [com.XXX.ProducerMessageHandler#0];嵌套异常是 java.lang.RuntimeException: test foo
01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - 在通道 'toKafka' 上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d- 8f2c058dda4d}]
01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d -8f2c058dda4d}]
调试:- com.XXX.ProducerMessageHandler#0 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}]
出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [com.XXX.ProducerMessageHandler#0];嵌套异常是 java.lang.RuntimeException: test foo
01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - 在通道 'toKafka' 上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44- f646aa932277}]
01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44 -f646aa932277}]
出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6];嵌套异常是 org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException:断路器已打开 org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6
01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - 在通道 'toKafka' 上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745- 1387e6045e7d}]
01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745 -1387e6045e7d}]
出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6];嵌套异常是 org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException:断路器已打开 org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6
该建议仅适用于分配给它的端点,不适用于下游流;不幸的是,kafka 模式不允许将其应用于出站通道适配器。我创建了一个 JIRA issue for that.
一种变通方法是将 KafkaProducerMessageHandler
配置为 <bean/>
并从 <service-activator/>
配置 ref
。然后你可以应用你的断路器。
另一种解决方法是使用流入网关...
<int:service-activator ... ref="gw">
<int:request-handler-advice-chain ...
</int:service-activator>
<int:gateway id="gw" default-request-channel="toKafka"
default-reply-timeout="0"
error-channel="..." ... />
我不确定您为什么没有在错误频道中看到消息;通常,打开 DEBUG 日志记录将有助于调试这种东西。
编辑
我刚刚用这个测试过,它工作得很好...
<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />
<int:service-activator input-channel="toKafka">
<bean class="com.example.Foo" />
<int:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2"/>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
EDIT2
如果您不使用网关,您可以使用队列通道和轮询器来处理它。这对我也很好用...
<int:channel id="toKafka">
<int:queue />
</int:channel>
<int:service-activator input-channel="toKafka">
<bean class="com.example.Foo" />
<int:poller error-channel="errorChannel" fixed-delay="1000" />
<int:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2"/>
<property name="halfOpenAfter" value="12000"/>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
或者,您可以添加一个中流网关。
从聊天中复制以供将来参考 :
山姆:嗨加里
for(int i=0;i<4;i++){
try{
toKafka.send(MessageBuilder
.withPayload("hello").
build());
}catch(Exception e){
System.out.println("got exception : " + e); } }
这就是我发送消息的方式
Gary :所以您是直接发送到频道 - 您应该改用 MessagingGateway。
山姆:嗨,加里。
thanks.it 正在使用 Gateway。
使用 KafkaProducerMessageHandler 配置 CB 很好,但它涵盖了以下方法
下的任何失败
public void handleMessageInternal(final Message message) 抛出异常
但我想解决网络错误问题以及 invalid broker list/server down 它没有涵盖,我在控制台中收到异常像这样:
log
12-24@16:46:46,250 DEBUGspringframework.integration.kafka.outbound.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0 received message: GenericMessage [payload=TestVo[data=sample message]], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, kafka_topic=tried_in, kafka_partitionId=2, id=7b596368-0aee-ddaa-2168-dc403e22c38f, timestamp=1450955805294}]
12-24@16:55:12,630 WARN apache.kafka.common.network.Selector - Error in I/O with /1.2.0.3
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Unknown Source)
希望在这种情况下也调用 CB。
Gary : 连接异常应该发生在 handleMessageInternal()
如果没有抛出异常,那就是一个错误。我去看看。
Future 在 handleMessageInternal 中被丢弃 - 我将打开一个 JIRA 问题。
https://jira.spring.io/browse/INTEXT-218
Sam : ok.is 它会涵盖 kafka 服务器由于某种原因宕机的情况吗?
加里:是的;但您可能希望减少默认超时 (60s)
<int:service-activator input-channel="toKafka" ref="conditionalProducerService" method="producerCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice1" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel2" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
<int:poller fixed-delay="1000" error-channel="failedChannel2" />
</int-kafka:outbound-channel-adapter>
<int:chain input-channel="failedChannel2">
<int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");}
通过以上配置,我们正在尝试:
1.Expecting 让失败的消息传播到错误通道="failedChannel2" 而不是 happening.as 我在控制台中看不到转换后的输出。
2.CircuitBreaker 正在为 ServiceActivator 工作(如上所示与应用程序相关的异常)但是我们如何为出站适配器的失败情况配置 CB。示例:当连接超时或服务器突然关闭时 /network connection problem/some 将消息从 SI 通道发送到外部(kafka)server.Can 之前的环境问题,我们为这种情况配置了带有出站适配器的 CB。
根据有关断路器建议的 SI 文档,见下文。
“通常,此建议可能用于外部服务,其中可能需要一些时间才能失败(例如 作为尝试建立网络连接的超时)。
请建议如何实现this.Many谢谢。
更新配置:
<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />
<int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
<constructor-arg ref="producerContext"/>
</bean>
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />
<int:transformer input-channel="errorChannel"
order="1" ref="transformerService1" method="transformFailed">
</int:transformer>
public void transformFailed(Message<?> message) {
APPLOGGER.log("transformer message test" + message);
public class ProducerMessageHandler extends KafkaProducerMessageHandler{
public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
super(kafkaProducerContext);
// TODO Auto-generated constructor stub
}
@Override
public void handleMessageInternal(final Message<?> message) throws Exception {
//super.handleMessageInternal(message);
throw new RuntimeException("test foo");
}
日志:
01-05@23:44:18,598 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162- 3b93-9bb6-0699-89b15b20e904}] 调试:- com.XXX.ProducerMessageHandler#0 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904}] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [com.XXX.ProducerMessageHandler#0];嵌套异常是 java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - 在通道 'toKafka' 上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d- 8f2c058dda4d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d -8f2c058dda4d}] 调试:- com.XXX.ProducerMessageHandler#0 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [com.XXX.ProducerMessageHandler#0];嵌套异常是 java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - 在通道 'toKafka' 上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44- f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44 -f646aa932277}] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6];嵌套异常是 org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException:断路器已打开 org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - 在通道 'toKafka' 上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745- 1387e6045e7d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745 -1387e6045e7d}] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6];嵌套异常是 org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException:断路器已打开 org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6
该建议仅适用于分配给它的端点,不适用于下游流;不幸的是,kafka 模式不允许将其应用于出站通道适配器。我创建了一个 JIRA issue for that.
一种变通方法是将 KafkaProducerMessageHandler
配置为 <bean/>
并从 <service-activator/>
配置 ref
。然后你可以应用你的断路器。
另一种解决方法是使用流入网关...
<int:service-activator ... ref="gw">
<int:request-handler-advice-chain ...
</int:service-activator>
<int:gateway id="gw" default-request-channel="toKafka"
default-reply-timeout="0"
error-channel="..." ... />
我不确定您为什么没有在错误频道中看到消息;通常,打开 DEBUG 日志记录将有助于调试这种东西。
编辑
我刚刚用这个测试过,它工作得很好...
<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />
<int:service-activator input-channel="toKafka">
<bean class="com.example.Foo" />
<int:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2"/>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
EDIT2
如果您不使用网关,您可以使用队列通道和轮询器来处理它。这对我也很好用...
<int:channel id="toKafka">
<int:queue />
</int:channel>
<int:service-activator input-channel="toKafka">
<bean class="com.example.Foo" />
<int:poller error-channel="errorChannel" fixed-delay="1000" />
<int:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2"/>
<property name="halfOpenAfter" value="12000"/>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
或者,您可以添加一个中流网关。
从聊天中复制以供将来参考 :
山姆:嗨加里
for(int i=0;i<4;i++){
try{
toKafka.send(MessageBuilder
.withPayload("hello").
build());
}catch(Exception e){
System.out.println("got exception : " + e); } }
这就是我发送消息的方式
Gary :所以您是直接发送到频道 - 您应该改用 MessagingGateway。 山姆:嗨,加里。 thanks.it 正在使用 Gateway。
使用 KafkaProducerMessageHandler 配置 CB 很好,但它涵盖了以下方法
下的任何失败public void handleMessageInternal(final Message message) 抛出异常
但我想解决网络错误问题以及 invalid broker list/server down 它没有涵盖,我在控制台中收到异常像这样:
log
12-24@16:46:46,250 DEBUGspringframework.integration.kafka.outbound.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0 received message: GenericMessage [payload=TestVo[data=sample message]], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, kafka_topic=tried_in, kafka_partitionId=2, id=7b596368-0aee-ddaa-2168-dc403e22c38f, timestamp=1450955805294}]
12-24@16:55:12,630 WARN apache.kafka.common.network.Selector - Error in I/O with /1.2.0.3
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Unknown Source)
希望在这种情况下也调用 CB。
Gary : 连接异常应该发生在 handleMessageInternal() 如果没有抛出异常,那就是一个错误。我去看看。
Future 在 handleMessageInternal 中被丢弃 - 我将打开一个 JIRA 问题。
https://jira.spring.io/browse/INTEXT-218
Sam : ok.is 它会涵盖 kafka 服务器由于某种原因宕机的情况吗?
加里:是的;但您可能希望减少默认超时 (60s)