使用 TcpOutboundGateway 和 CachingClientConnectionFactory 的虚假错误 "Cannot correlate response - no pending reply"

Spurious error "Cannot correlate response - no pending reply" using TcpOutboundGateway and CachingClientConnectionFactory

我在多线程上下文中将 TcpOutboundGateway 与 CachingClientConnectionFactory 结合使用时出现虚假关联错误。

日志消息是: 2015-05-26 14:50:38.406 错误 3320 --- [pool-2-thread-2] o.s.i.ip.tcp.TcpOutboundGateway:无法关联响应 - 没有待处理的回复

从单线程发送时我没有收到错误,我已经测试了 2 台物理机 - Windows 7 和 Fedora 20。我正在使用 Spring boot

它导致未收到响应的发送超时错误。

下面是我的简化代码: 请注意,它并不总是会产生错误 - 它是虚假的 该代码使用了 TcpOutboundGateway 和 TcpInboundGateway,但在我的实际应用程序中,服务器是遗留的(不是​​ Spring)Java 代码,所以我使用 CachingClientConnectionFactory 来提高性能

@Configuration
@ComponentScan
@EnableAutoConfiguration
public class Test {

    //**************** Client **********************************************
    @Bean
    public MessageChannel replyChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel sendChannel() {
        MessageChannel directChannel = new DirectChannel();
        return directChannel;
    }

    @Bean
    AbstractClientConnectionFactory tcpNetClientConnectionFactory() {
        AbstractClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("localhost", 9003);
        CachingClientConnectionFactory cachingClientConnectionFactory = new CachingClientConnectionFactory(tcpNetClientConnectionFactory, 4);
        return cachingClientConnectionFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "sendChannel")
    TcpOutboundGateway tcpOutboundGateway() {
        TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
        tcpOutboundGateway.setConnectionFactory(tcpNetClientConnectionFactory());
        tcpOutboundGateway.setReplyChannel(replyChannel());

        return tcpOutboundGateway;
    }
    //******************************************************************


    //**************** Server **********************************************
    @Bean
    public MessageChannel receiveChannel() {
        return new DirectChannel();
    }

    @Bean
    TcpNetServerConnectionFactory tcpNetServerConnectionFactory() {
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory =  new TcpNetServerConnectionFactory(9003);
        tcpNetServerConnectionFactory.setSingleUse(false);
        return tcpNetServerConnectionFactory;
    }

    @Bean
    TcpInboundGateway tcpInboundGateway() {
        TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
        tcpInboundGateway.setConnectionFactory(tcpNetServerConnectionFactory());
        tcpInboundGateway.setRequestChannel(receiveChannel());
        return tcpInboundGateway;
    }
    //******************************************************************

    @Bean
    @Scope("prototype")
    Worker worker() {
        return new Worker();
    }

    public volatile static int lc = 4;
    public volatile static int counter = lc;
    public volatile static long totStartTime = 0;
    public volatile static int messageCount = 0;

    public static synchronized int incMessageCount(){
        return ++messageCount;
    }


    public static void main(String args[]) {
        //new LegaServer();
        ConfigurableApplicationContext applicationContext = SpringApplication.run(Test.class, args);
        totStartTime = System.currentTimeMillis();

        for (int z = 0; z < lc; z++) {
            new Thread((Worker) applicationContext.getBean("worker")).start();
        }

        try {
            Thread.currentThread().sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        applicationContext.stop();

    }
}


@MessageEndpoint
class RequestHandler {

    @ServiceActivator(inputChannel = "receiveChannel")
    public String rxHandler(byte [] in) {
        String s = new String(in);
        System.out.println("rxHandler:"+s);
        return "Blah blah " + s;
    }

}

@MessageEndpoint
class ResponseHandler {

    @ServiceActivator(inputChannel = "replyChannel")
    public void replyHandler(byte [] in) {
        System.out.println("replyHandler:"+new String(in));
    }

}

class Worker implements Runnable {

    @Autowired
    @Qualifier("sendChannel")
    MessageChannel dc;

    @Override
    public void run() {
        Test.counter--;
        int locMessageCount=0;
        long startTime = System.currentTimeMillis();
        for (int t = 0; t < 20; t++) {

            locMessageCount = Test.incMessageCount();

            Map hs = new HashMap<String, String>();
            hs.put("context", new Integer(Test.counter));

            GenericMessage message = new GenericMessage("this is a test message " + locMessageCount, hs);

            try {
                boolean sent = dc.send(message);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("locMessageCount:"+locMessageCount);
            }

        }

        if (locMessageCount == (Test.lc*20)) {
            long totfinTime = System.currentTimeMillis();
            System.out.println("Tot. Time taken: " + (totfinTime - Test.totStartTime));
            System.out.println("Tot. TPS: " + (1000 * 20* Test.lc) / (totfinTime - Test.totStartTime));
            System.out.println("Tot. messages: " + Test.messageCount);
        }

    }
}

如有任何建议,我将不胜感激,到目前为止我得到的帮助也是如此。 TY

谢谢;这是出站网关和缓存连接工厂组合的错误;请开一个JIRA Issue.

问题在于,在第一个线程 (Thread-5) 删除挂起的回复之前,连接被添加回池中(并重新使用);他最终删除了新的待处理回复(对于 Thread-2)而不是他自己的。

很遗憾,我没有适合您的简单解决方法;它需要在网关中更改代码才能修复它。