使用 TcpOutboundGateway 和 CachingClientConnectionFactory 的虚假错误 "Cannot correlate response - no pending reply"
Spurious error "Cannot correlate response - no pending reply" using TcpOutboundGateway and CachingClientConnectionFactory
我在多线程上下文中将 TcpOutboundGateway 与 CachingClientConnectionFactory 结合使用时出现虚假关联错误。
日志消息是:
2015-05-26 14:50:38.406 错误 3320 --- [pool-2-thread-2] o.s.i.ip.tcp.TcpOutboundGateway:无法关联响应 - 没有待处理的回复
从单线程发送时我没有收到错误,我已经测试了 2 台物理机 - Windows 7 和 Fedora 20。我正在使用 Spring boot
它导致未收到响应的发送超时错误。
下面是我的简化代码:
请注意,它并不总是会产生错误 - 它是虚假的
该代码使用了 TcpOutboundGateway 和 TcpInboundGateway,但在我的实际应用程序中,服务器是遗留的(不是 Spring)Java 代码,所以我使用 CachingClientConnectionFactory 来提高性能
@Configuration
@ComponentScan
@EnableAutoConfiguration
public class Test {
//**************** Client **********************************************
@Bean
public MessageChannel replyChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel sendChannel() {
MessageChannel directChannel = new DirectChannel();
return directChannel;
}
@Bean
AbstractClientConnectionFactory tcpNetClientConnectionFactory() {
AbstractClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("localhost", 9003);
CachingClientConnectionFactory cachingClientConnectionFactory = new CachingClientConnectionFactory(tcpNetClientConnectionFactory, 4);
return cachingClientConnectionFactory;
}
@Bean
@ServiceActivator(inputChannel = "sendChannel")
TcpOutboundGateway tcpOutboundGateway() {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(tcpNetClientConnectionFactory());
tcpOutboundGateway.setReplyChannel(replyChannel());
return tcpOutboundGateway;
}
//******************************************************************
//**************** Server **********************************************
@Bean
public MessageChannel receiveChannel() {
return new DirectChannel();
}
@Bean
TcpNetServerConnectionFactory tcpNetServerConnectionFactory() {
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(9003);
tcpNetServerConnectionFactory.setSingleUse(false);
return tcpNetServerConnectionFactory;
}
@Bean
TcpInboundGateway tcpInboundGateway() {
TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
tcpInboundGateway.setConnectionFactory(tcpNetServerConnectionFactory());
tcpInboundGateway.setRequestChannel(receiveChannel());
return tcpInboundGateway;
}
//******************************************************************
@Bean
@Scope("prototype")
Worker worker() {
return new Worker();
}
public volatile static int lc = 4;
public volatile static int counter = lc;
public volatile static long totStartTime = 0;
public volatile static int messageCount = 0;
public static synchronized int incMessageCount(){
return ++messageCount;
}
public static void main(String args[]) {
//new LegaServer();
ConfigurableApplicationContext applicationContext = SpringApplication.run(Test.class, args);
totStartTime = System.currentTimeMillis();
for (int z = 0; z < lc; z++) {
new Thread((Worker) applicationContext.getBean("worker")).start();
}
try {
Thread.currentThread().sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
applicationContext.stop();
}
}
@MessageEndpoint
class RequestHandler {
@ServiceActivator(inputChannel = "receiveChannel")
public String rxHandler(byte [] in) {
String s = new String(in);
System.out.println("rxHandler:"+s);
return "Blah blah " + s;
}
}
@MessageEndpoint
class ResponseHandler {
@ServiceActivator(inputChannel = "replyChannel")
public void replyHandler(byte [] in) {
System.out.println("replyHandler:"+new String(in));
}
}
class Worker implements Runnable {
@Autowired
@Qualifier("sendChannel")
MessageChannel dc;
@Override
public void run() {
Test.counter--;
int locMessageCount=0;
long startTime = System.currentTimeMillis();
for (int t = 0; t < 20; t++) {
locMessageCount = Test.incMessageCount();
Map hs = new HashMap<String, String>();
hs.put("context", new Integer(Test.counter));
GenericMessage message = new GenericMessage("this is a test message " + locMessageCount, hs);
try {
boolean sent = dc.send(message);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("locMessageCount:"+locMessageCount);
}
}
if (locMessageCount == (Test.lc*20)) {
long totfinTime = System.currentTimeMillis();
System.out.println("Tot. Time taken: " + (totfinTime - Test.totStartTime));
System.out.println("Tot. TPS: " + (1000 * 20* Test.lc) / (totfinTime - Test.totStartTime));
System.out.println("Tot. messages: " + Test.messageCount);
}
}
}
如有任何建议,我将不胜感激,到目前为止我得到的帮助也是如此。 TY
谢谢;这是出站网关和缓存连接工厂组合的错误;请开一个JIRA Issue.
问题在于,在第一个线程 (Thread-5) 删除挂起的回复之前,连接被添加回池中(并重新使用);他最终删除了新的待处理回复(对于 Thread-2)而不是他自己的。
很遗憾,我没有适合您的简单解决方法;它需要在网关中更改代码才能修复它。
我在多线程上下文中将 TcpOutboundGateway 与 CachingClientConnectionFactory 结合使用时出现虚假关联错误。
日志消息是: 2015-05-26 14:50:38.406 错误 3320 --- [pool-2-thread-2] o.s.i.ip.tcp.TcpOutboundGateway:无法关联响应 - 没有待处理的回复
从单线程发送时我没有收到错误,我已经测试了 2 台物理机 - Windows 7 和 Fedora 20。我正在使用 Spring boot
它导致未收到响应的发送超时错误。
下面是我的简化代码: 请注意,它并不总是会产生错误 - 它是虚假的 该代码使用了 TcpOutboundGateway 和 TcpInboundGateway,但在我的实际应用程序中,服务器是遗留的(不是 Spring)Java 代码,所以我使用 CachingClientConnectionFactory 来提高性能
@Configuration
@ComponentScan
@EnableAutoConfiguration
public class Test {
//**************** Client **********************************************
@Bean
public MessageChannel replyChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel sendChannel() {
MessageChannel directChannel = new DirectChannel();
return directChannel;
}
@Bean
AbstractClientConnectionFactory tcpNetClientConnectionFactory() {
AbstractClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("localhost", 9003);
CachingClientConnectionFactory cachingClientConnectionFactory = new CachingClientConnectionFactory(tcpNetClientConnectionFactory, 4);
return cachingClientConnectionFactory;
}
@Bean
@ServiceActivator(inputChannel = "sendChannel")
TcpOutboundGateway tcpOutboundGateway() {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(tcpNetClientConnectionFactory());
tcpOutboundGateway.setReplyChannel(replyChannel());
return tcpOutboundGateway;
}
//******************************************************************
//**************** Server **********************************************
@Bean
public MessageChannel receiveChannel() {
return new DirectChannel();
}
@Bean
TcpNetServerConnectionFactory tcpNetServerConnectionFactory() {
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(9003);
tcpNetServerConnectionFactory.setSingleUse(false);
return tcpNetServerConnectionFactory;
}
@Bean
TcpInboundGateway tcpInboundGateway() {
TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
tcpInboundGateway.setConnectionFactory(tcpNetServerConnectionFactory());
tcpInboundGateway.setRequestChannel(receiveChannel());
return tcpInboundGateway;
}
//******************************************************************
@Bean
@Scope("prototype")
Worker worker() {
return new Worker();
}
public volatile static int lc = 4;
public volatile static int counter = lc;
public volatile static long totStartTime = 0;
public volatile static int messageCount = 0;
public static synchronized int incMessageCount(){
return ++messageCount;
}
public static void main(String args[]) {
//new LegaServer();
ConfigurableApplicationContext applicationContext = SpringApplication.run(Test.class, args);
totStartTime = System.currentTimeMillis();
for (int z = 0; z < lc; z++) {
new Thread((Worker) applicationContext.getBean("worker")).start();
}
try {
Thread.currentThread().sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
applicationContext.stop();
}
}
@MessageEndpoint
class RequestHandler {
@ServiceActivator(inputChannel = "receiveChannel")
public String rxHandler(byte [] in) {
String s = new String(in);
System.out.println("rxHandler:"+s);
return "Blah blah " + s;
}
}
@MessageEndpoint
class ResponseHandler {
@ServiceActivator(inputChannel = "replyChannel")
public void replyHandler(byte [] in) {
System.out.println("replyHandler:"+new String(in));
}
}
class Worker implements Runnable {
@Autowired
@Qualifier("sendChannel")
MessageChannel dc;
@Override
public void run() {
Test.counter--;
int locMessageCount=0;
long startTime = System.currentTimeMillis();
for (int t = 0; t < 20; t++) {
locMessageCount = Test.incMessageCount();
Map hs = new HashMap<String, String>();
hs.put("context", new Integer(Test.counter));
GenericMessage message = new GenericMessage("this is a test message " + locMessageCount, hs);
try {
boolean sent = dc.send(message);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("locMessageCount:"+locMessageCount);
}
}
if (locMessageCount == (Test.lc*20)) {
long totfinTime = System.currentTimeMillis();
System.out.println("Tot. Time taken: " + (totfinTime - Test.totStartTime));
System.out.println("Tot. TPS: " + (1000 * 20* Test.lc) / (totfinTime - Test.totStartTime));
System.out.println("Tot. messages: " + Test.messageCount);
}
}
}
如有任何建议,我将不胜感激,到目前为止我得到的帮助也是如此。 TY
谢谢;这是出站网关和缓存连接工厂组合的错误;请开一个JIRA Issue.
问题在于,在第一个线程 (Thread-5) 删除挂起的回复之前,连接被添加回池中(并重新使用);他最终删除了新的待处理回复(对于 Thread-2)而不是他自己的。
很遗憾,我没有适合您的简单解决方法;它需要在网关中更改代码才能修复它。