@HystrixCommand 没有在 spring-integration @InboundChannelAdapter 上调用 fallbackMethod

@HystrixCommand not calling fallbackMethod on spring-integration @InboundChannelAdapter

我已经用@EnableCircuitBreaker 注释了我的@SpringBootApplication

并想尝试使用未启动的 amqp 代理 (rabbitmq) 进行故障转移, 但从未调用过我的故障转移方法。

有什么想法吗?

package demo.sources.time;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;

import demo.common.dto.HelloDTO;
import demo.sources.configs.TimeSourceOptionsMetadata;

@EnableBinding(Source.class)
@EnableConfigurationProperties(TimeSourceOptionsMetadata.class)
public class TimeSource {
    private static final Logger logger = LoggerFactory.getLogger(TimeSource.class);

    @Value("${app.info.instance_index}")
    private String instanceIndex;

    @Autowired
    private TimeSourceOptionsMetadata timeSourceOptionsMetadata;

    @InboundChannelAdapter(value = Source.OUTPUT)
    @HystrixCommand(fallbackMethod = "fallbackTimerMessageSource")
    public HelloDTO timerMessageSource() {
        HelloDTO helloDTO = new HelloDTO();
        helloDTO.name = new SimpleDateFormat(this.timeSourceOptionsMetadata.getFormat()).format(new Date());
        logger.info("[{}]Produced: '{}'", instanceIndex, helloDTO);
        return helloDTO;
    }

    public HelloDTO fallbackTimerMessageSource() {
        logger.error("Hystrix fallbackTimerMessageSource handled exception.")
        return new HelloDTO();
    }
}

堆栈跟踪是:

2017-07-10 22:26:52.212  INFO 78432 --- [hystrix-TimeSource$$EnhancerBySpringCGLIB$c7b153-1]     demo.sources.time.TimeSource             : [0]Produced: 'Hello 2017-07-10 22:26:52!'
2017-07-10 22:26:52.302 ERROR 78432 --- [task-scheduler-1] o.s.integration.handler.LoggingHandler   :     org.springframework.messaging.MessageHandlingException: error occurred in message handler     [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@317fa27f]; nested exception is     org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused),     failedMessage=GenericMessage [payload=byte[49], headers={id=19675fd6-5e64-fcbe-9ee1-33eeec3b25e1, contentType=text/plain,     originalContentType=application/json;charset=UTF-8, timestamp=1499718412291}]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
        <snip>
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused)
        at     org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java    :62)
        at     org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:368)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:565)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1430)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:712)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:134)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:122)
        at     org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMess    ageHandler.java:109)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
        ... 30 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:859)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1000)
        at     org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:356)
        ... 38 more

根据你的StackTrace(at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send),问题已经离timerMessageSource()方法很远了。

您应该考虑完全针对 AmqpOutboundEndpoint 实施 AbstractRequestHandlerAdvice。您可以在 Reference Manual.

中找到有关该问题的文档

还有 some sample 来演示它是如何工作的。

我通过直接注入消息通道并使用其发送方法绕过了它:

@EnableBinding(Source.class)
@EnableConfigurationProperties(TimeSourceOptionsMetadata.class)
public class TimeSource {
    private static final Logger logger = LoggerFactory.getLogger(TimeSource.class);

    @Value("${app.info.instance_index}")
    private String instanceIndex;

    @Autowired
    private TimeSourceOptionsMetadata timeSourceOptionsMetadata;

    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel outputMessageChannel;

    /** with using the injected MessageHeaders and wrapping with a Hystrix CircuitBreaker */
    @Scheduled(fixedDelay = 2500, initialDelay = 500)
    @HystrixCommand(fallbackMethod = "fallbackTimerMessageSource")
    public void timerMessageSource() {
        HelloDTO helloDTO = new HelloDTO();
        helloDTO.name = new SimpleDateFormat(this.timeSourceOptionsMetadata.getFormat()).format(new Date());
        logger.info("[{}]Produced: '{}'", instanceIndex, helloDTO);
        Message<HelloDTO> theMessage = MessageBuilder.withPayload(helloDTO).setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
                    .build();
        outputMessageChannel.send(theMessage, 1000l);
    }

    public void fallbackTimerMessageSource(Throwable t) {
        logger.error("Hystrix fallbackTimerMessageSource handle exception. The original exception was {}", t.getMessage());
        Throwable cause = t.getCause();
        logger.error("Exception cause was: {} {}", (cause == null ? "null" : cause.getClass().getName()),
                    (cause == null ? "null" : cause.getMessage()));
    }
}