意外消息 - 与多个服务器通信时没有端点注册到连接拦截器
Unexpected message - no endpoint registered with connection interceptor while communicating with multiple servers
@Configuration
@Component
public class GatewayAqrConfig {
@Autowired
ConnectorService connectorService;
@Autowired
MasterService masterService;
private HashMap<ConnectorPK, GatewayAqr> connectorMap;
@Bean
@Scope(value = "prototype")
public AbstractClientConnectionFactory clientCF(Connector connector , Master master) {
TcpNetClientConnectionFactory clientConnectionFactory = new TcpNetClientConnectionFactory(connector.getAqrIpAddr(), connector.getAqrIpPortNo());
clientConnectionFactory.setSingleUse(false);
MyByteArraySerializer obj = new MyByteArraySerializer(master.getAqrMsgHeaderLength(), master.getAqrId());
clientConnectionFactory.setSerializer(obj);
clientConnectionFactory.setDeserializer(obj);
clientConnectionFactory.setSoKeepAlive(true);
TcpMessageMapper tcpMessageMapper = new TcpMessageMapper();
tcpMessageMapper.setCharset("ISO-8859-1");
clientConnectionFactory.setMapper(tcpMessageMapper);
clientConnectionFactory.setBeanName(connector.getAqrIpAddr() + ":" + connector.getAqrIpPortNo());
clientConnectionFactory.afterPropertiesSet();
clientConnectionFactory.start();
return clientConnectionFactory;
}
@Bean
@Scope(value = "prototype")
public TcpSendingMessageHandler tcpOutGateway(AbstractClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler messageHandler = new TcpSendingMessageHandler();
messageHandler.setConnectionFactory(connectionFactory);
messageHandler.setClientMode(true);
messageHandler.setTaskScheduler(getTaskScheduler());
messageHandler.setStatsEnabled(true);
messageHandler.afterPropertiesSet();
messageHandler.start();
return messageHandler;
}
@Bean
@Scope(value = "prototype")
public TcpReceivingChannelAdapter tcpInGateway(AbstractClientConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter messageHandler = new TcpReceivingChannelAdapter();
messageHandler.setConnectionFactory(connectionFactory);
messageHandler.setClientMode(true);
messageHandler.setOutputChannel(receive());
messageHandler.setAutoStartup(true);
messageHandler.setTaskScheduler(getTaskScheduler());
messageHandler.afterPropertiesSet();
messageHandler.start();
return messageHandler;
}
@Bean
@Scope(value = "prototype")
public TaskScheduler getTaskScheduler() {
TaskScheduler ts = new ThreadPoolTaskScheduler();
return ts;
}
@Bean
public MessageChannel receive() {
QueueChannel channel = new QueueChannel();
return channel;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return new PollerMetadata();
}
@Bean
@Transactional
public HashMap<ConnectorPK, GatewayAqr> gatewayAqr() throws Exception {
connectorMap = new HashMap();
Connector connector = null;
ConnectorPK connectorPK = null;
Master master = null;
TcpConnectionSupport connectionSupport = null;
// 1. Get List of Connections configured in Database
List<Connector> connectors = connectorService.getConnections();
if (connectors.size() > 0) {
for (int i = 0; i < connectors.size(); i++) {
// 2. Get the connection details
connector = connectors.get(i);
connectorPK = aqrConnector.getConnectorpk();
master = masterService.findById(connectorPK.getAcuirerId());
try {
// 3. Create object of TcpNetClientConnectionFactory for each Acquirer connection
AbstractClientConnectionFactory clientConnectionFactory = clientCF(aqrConnector, aqrMaster);
// 4. Create TcpSendingMessageHandler for the Connection
TcpSendingMessageHandler outHandler = tcpOutGateway(clientConnectionFactory);
// 5. Create TcpReceivingChannelAdapter object for the Connection and assign it to receive channel
TcpReceivingChannelAdapter inHandler = tcpInGateway(clientConnectionFactory);
// 6. Generate the GatewayAqr object
GatewayAqr gatewayAqr = new GatewayAqr(clientConnectionFactory, outHandler, inHandler);
// 7. Put in the MAP acuirerPK and Send MessageHandler object
connectorMap.put(aqrConnectorPK, gatewayAquirer);
} catch (Exception e) {
}
} // for
} // if
return connectorMap;
}
}
*********************************************************************************************************************************
@EnableIntegration
@IntegrationComponentScan(basePackageClasses = {GatewayEventConfig.class,GatewayAqrConfig.class })
@Configuration
@ComponentScan(basePackages = {"com.iz.zw.gateway.impl", "com.iz.zw.configuration"})
@Import({GatewayEventConfig.class,GatewayAquirerConfig.class})
public class GatewayConfig {
@Autowired
private GatewayAsyncReply<Object, Message<?>> gatewayAsyncReply;
@Autowired
private GatewayCorrelationStrategy gatewayCorrelationStrategy;
@Autowired
private HashMap<ConnectorPK, GatewayAqr> gatewayAqrs;
@Autowired
ConnectorService connectorService;
@Autowired
GatewayResponseDeserializer gatewayResponseDeserializer;
@MessagingGateway(defaultRequestChannel = "send")
public interface Gateway {
void waitForResponse(TransactionMessage transaction);
}
@Bean
public MessageChannel send() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
@ServiceActivator(inputChannel = "send")
public BarrierMessageHandlerWithLateGoodResponse barrier() {
BarrierMessageHandlerWithLateGoodResponse barrier = new BarrierMessageHandlerWithLateGoodResponse(25000, this.gatewayCorrelationStrategy);
barrier.setAsync(true);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateGoodresponseChannel());
return barrier;
}
@ServiceActivator(inputChannel = "out")
public void printMessage(Message<?> message) {
System.out.println("in out channel");
}
@Transformer(inputChannel = "receive", outputChannel = "process")
public TransactionMessage convert(byte[] response) {
logger.debug("Response Received", Arrays.toString(response));
TransactionMessage transactionMessage = gatewayResponseDeserializer.deserializeResponse(response);
System.out.println("Response : " + response);
return transactionMessage;
}
@ServiceActivator(inputChannel = "process")
@Bean
public MessageHandler releaser() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
gatewayAsyncReply.put(message);
barrier().trigger(message);
} catch (GatewayLateGoodMessageException exception) {
System.out.println("Late good response..!");
gatewayAsyncReply.get(message);
lateGoodresponseChannel().send(message);
}
}
};
}
@Bean
public MessageChannel process() {
QueueChannel channel = new QueueChannel();
return channel;
}
@Bean
public MessageChannel out() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public MessageChannel lateGoodresponseChannel() {
QueueChannel channel = new QueueChannel();
return channel;
}
@ServiceActivator(inputChannel="lateGoodresponseChannel")
public void handleLateGoodResponse(Message<?> message) {
String strSTAN = null;
String strResponse = null;
Message<?> respMessage = null;
if(message instanceof TransactionMessage){
strSTAN = ((TransactionMessage)message).getStan();
respMessage = gatewayAsyncReply.get(strSTAN);
if (null != respMessage) {
strResponse = (String) message.getPayload();
}
}
logger.info("Late Good Response: " + strResponse);
}
}
*********************************************************************************************************************************
@Configuration
public class GatewayEventConfig {
private static final Logger logger = LoggerFactory.getLogger(GatewayEventConfig.class);
@Bean
public ApplicationEventListeningMessageProducer tcpEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(new Class[] {TcpConnectionOpenEvent.class, TcpConnectionCloseEvent.class, TcpConnectionExceptionEvent.class});
producer.setOutputChannel(tcpEventChannel());
producer.setAutoStartup(true);
producer.setTaskScheduler(getEventTaskScheduler());
producer.start();
return producer;
}
@Bean
public TaskScheduler getEventTaskScheduler() {
TaskScheduler ts = new ThreadPoolTaskScheduler();
return ts;
}
@Bean
public MessageChannel tcpEventChannel() {
return new QueueChannel();
}
@Transactional
@ServiceActivator(inputChannel = "tcpEventChannel")
public void tcpConnectionEvent(TcpConnectionEvent event) {
System.out.println("In publishing" + event.toString());
String strConnectionFactory = event.getConnectionFactoryName();
if (strConnectionFactory.equals("connection1")) {
//send some message to connector
} else {
// send message to another connector
}
}
}
这是我的配置文件,我的应用程序一启动就尝试连接到 2 个服务器。
如上所述,我为 2 个服务器做了 2 个配置 class
GatewayAqrConfig1 和 GatewayConfig1 classes 用于第一个服务器连接
GatewayAqrConfig2 和 GatewayConfig2 classes 用于第二个服务器连接
使用事件我正在连接到服务器并发送连接设置消息,如果服务器已经启动并且如果我已经启动了我的应用程序,
它获取事件、连接并发送消息,但我没有收到响应,而是收到如下警告
**WARN TcpNetConnection:186 - Unexpected message - no endpoint registered with connection interceptor:**
i.e connection does not registers the listener properly
但是如果我先启动我的应用程序然后再启动服务器我会得到完美的响应,因为我正在连接到服务器
我无法重新启动它?我的应用程序应该连接到已经启动的服务器?可能是什么问题呢 ?
使用的版本:
Spring 集成版本:4.3.1
Spring 版本:4.3.2
JDK 1.8 JBOSS EAP 7
该 WARN 消息意味着,不知何故,在没有 TcpReceivingChannelAdapter
向连接工厂注册的情况下收到了入站消息。客户端模式应该没有区别。
仔细查看了您的代码,原型 bean 应该没问题,只要您使用这些对象(尤其是 TcpMessageHandler
直接使用而不是通过框架)。
根据您的配置,我不太清楚这是怎么发生的;当您在接收适配器上调用 setConnectionFactory
时,侦听器已注册。
如果你可以用 trimmed-down 项目和 post 某个地方重现它,我会看看。
@Configuration
@Component
public class GatewayAqrConfig {
@Autowired
ConnectorService connectorService;
@Autowired
MasterService masterService;
private HashMap<ConnectorPK, GatewayAqr> connectorMap;
@Bean
@Scope(value = "prototype")
public AbstractClientConnectionFactory clientCF(Connector connector , Master master) {
TcpNetClientConnectionFactory clientConnectionFactory = new TcpNetClientConnectionFactory(connector.getAqrIpAddr(), connector.getAqrIpPortNo());
clientConnectionFactory.setSingleUse(false);
MyByteArraySerializer obj = new MyByteArraySerializer(master.getAqrMsgHeaderLength(), master.getAqrId());
clientConnectionFactory.setSerializer(obj);
clientConnectionFactory.setDeserializer(obj);
clientConnectionFactory.setSoKeepAlive(true);
TcpMessageMapper tcpMessageMapper = new TcpMessageMapper();
tcpMessageMapper.setCharset("ISO-8859-1");
clientConnectionFactory.setMapper(tcpMessageMapper);
clientConnectionFactory.setBeanName(connector.getAqrIpAddr() + ":" + connector.getAqrIpPortNo());
clientConnectionFactory.afterPropertiesSet();
clientConnectionFactory.start();
return clientConnectionFactory;
}
@Bean
@Scope(value = "prototype")
public TcpSendingMessageHandler tcpOutGateway(AbstractClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler messageHandler = new TcpSendingMessageHandler();
messageHandler.setConnectionFactory(connectionFactory);
messageHandler.setClientMode(true);
messageHandler.setTaskScheduler(getTaskScheduler());
messageHandler.setStatsEnabled(true);
messageHandler.afterPropertiesSet();
messageHandler.start();
return messageHandler;
}
@Bean
@Scope(value = "prototype")
public TcpReceivingChannelAdapter tcpInGateway(AbstractClientConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter messageHandler = new TcpReceivingChannelAdapter();
messageHandler.setConnectionFactory(connectionFactory);
messageHandler.setClientMode(true);
messageHandler.setOutputChannel(receive());
messageHandler.setAutoStartup(true);
messageHandler.setTaskScheduler(getTaskScheduler());
messageHandler.afterPropertiesSet();
messageHandler.start();
return messageHandler;
}
@Bean
@Scope(value = "prototype")
public TaskScheduler getTaskScheduler() {
TaskScheduler ts = new ThreadPoolTaskScheduler();
return ts;
}
@Bean
public MessageChannel receive() {
QueueChannel channel = new QueueChannel();
return channel;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return new PollerMetadata();
}
@Bean
@Transactional
public HashMap<ConnectorPK, GatewayAqr> gatewayAqr() throws Exception {
connectorMap = new HashMap();
Connector connector = null;
ConnectorPK connectorPK = null;
Master master = null;
TcpConnectionSupport connectionSupport = null;
// 1. Get List of Connections configured in Database
List<Connector> connectors = connectorService.getConnections();
if (connectors.size() > 0) {
for (int i = 0; i < connectors.size(); i++) {
// 2. Get the connection details
connector = connectors.get(i);
connectorPK = aqrConnector.getConnectorpk();
master = masterService.findById(connectorPK.getAcuirerId());
try {
// 3. Create object of TcpNetClientConnectionFactory for each Acquirer connection
AbstractClientConnectionFactory clientConnectionFactory = clientCF(aqrConnector, aqrMaster);
// 4. Create TcpSendingMessageHandler for the Connection
TcpSendingMessageHandler outHandler = tcpOutGateway(clientConnectionFactory);
// 5. Create TcpReceivingChannelAdapter object for the Connection and assign it to receive channel
TcpReceivingChannelAdapter inHandler = tcpInGateway(clientConnectionFactory);
// 6. Generate the GatewayAqr object
GatewayAqr gatewayAqr = new GatewayAqr(clientConnectionFactory, outHandler, inHandler);
// 7. Put in the MAP acuirerPK and Send MessageHandler object
connectorMap.put(aqrConnectorPK, gatewayAquirer);
} catch (Exception e) {
}
} // for
} // if
return connectorMap;
}
}
*********************************************************************************************************************************
@EnableIntegration
@IntegrationComponentScan(basePackageClasses = {GatewayEventConfig.class,GatewayAqrConfig.class })
@Configuration
@ComponentScan(basePackages = {"com.iz.zw.gateway.impl", "com.iz.zw.configuration"})
@Import({GatewayEventConfig.class,GatewayAquirerConfig.class})
public class GatewayConfig {
@Autowired
private GatewayAsyncReply<Object, Message<?>> gatewayAsyncReply;
@Autowired
private GatewayCorrelationStrategy gatewayCorrelationStrategy;
@Autowired
private HashMap<ConnectorPK, GatewayAqr> gatewayAqrs;
@Autowired
ConnectorService connectorService;
@Autowired
GatewayResponseDeserializer gatewayResponseDeserializer;
@MessagingGateway(defaultRequestChannel = "send")
public interface Gateway {
void waitForResponse(TransactionMessage transaction);
}
@Bean
public MessageChannel send() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
@ServiceActivator(inputChannel = "send")
public BarrierMessageHandlerWithLateGoodResponse barrier() {
BarrierMessageHandlerWithLateGoodResponse barrier = new BarrierMessageHandlerWithLateGoodResponse(25000, this.gatewayCorrelationStrategy);
barrier.setAsync(true);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateGoodresponseChannel());
return barrier;
}
@ServiceActivator(inputChannel = "out")
public void printMessage(Message<?> message) {
System.out.println("in out channel");
}
@Transformer(inputChannel = "receive", outputChannel = "process")
public TransactionMessage convert(byte[] response) {
logger.debug("Response Received", Arrays.toString(response));
TransactionMessage transactionMessage = gatewayResponseDeserializer.deserializeResponse(response);
System.out.println("Response : " + response);
return transactionMessage;
}
@ServiceActivator(inputChannel = "process")
@Bean
public MessageHandler releaser() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
gatewayAsyncReply.put(message);
barrier().trigger(message);
} catch (GatewayLateGoodMessageException exception) {
System.out.println("Late good response..!");
gatewayAsyncReply.get(message);
lateGoodresponseChannel().send(message);
}
}
};
}
@Bean
public MessageChannel process() {
QueueChannel channel = new QueueChannel();
return channel;
}
@Bean
public MessageChannel out() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public MessageChannel lateGoodresponseChannel() {
QueueChannel channel = new QueueChannel();
return channel;
}
@ServiceActivator(inputChannel="lateGoodresponseChannel")
public void handleLateGoodResponse(Message<?> message) {
String strSTAN = null;
String strResponse = null;
Message<?> respMessage = null;
if(message instanceof TransactionMessage){
strSTAN = ((TransactionMessage)message).getStan();
respMessage = gatewayAsyncReply.get(strSTAN);
if (null != respMessage) {
strResponse = (String) message.getPayload();
}
}
logger.info("Late Good Response: " + strResponse);
}
}
*********************************************************************************************************************************
@Configuration
public class GatewayEventConfig {
private static final Logger logger = LoggerFactory.getLogger(GatewayEventConfig.class);
@Bean
public ApplicationEventListeningMessageProducer tcpEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(new Class[] {TcpConnectionOpenEvent.class, TcpConnectionCloseEvent.class, TcpConnectionExceptionEvent.class});
producer.setOutputChannel(tcpEventChannel());
producer.setAutoStartup(true);
producer.setTaskScheduler(getEventTaskScheduler());
producer.start();
return producer;
}
@Bean
public TaskScheduler getEventTaskScheduler() {
TaskScheduler ts = new ThreadPoolTaskScheduler();
return ts;
}
@Bean
public MessageChannel tcpEventChannel() {
return new QueueChannel();
}
@Transactional
@ServiceActivator(inputChannel = "tcpEventChannel")
public void tcpConnectionEvent(TcpConnectionEvent event) {
System.out.println("In publishing" + event.toString());
String strConnectionFactory = event.getConnectionFactoryName();
if (strConnectionFactory.equals("connection1")) {
//send some message to connector
} else {
// send message to another connector
}
}
}
这是我的配置文件,我的应用程序一启动就尝试连接到 2 个服务器。 如上所述,我为 2 个服务器做了 2 个配置 class GatewayAqrConfig1 和 GatewayConfig1 classes 用于第一个服务器连接 GatewayAqrConfig2 和 GatewayConfig2 classes 用于第二个服务器连接 使用事件我正在连接到服务器并发送连接设置消息,如果服务器已经启动并且如果我已经启动了我的应用程序, 它获取事件、连接并发送消息,但我没有收到响应,而是收到如下警告
**WARN TcpNetConnection:186 - Unexpected message - no endpoint registered with connection interceptor:**
i.e connection does not registers the listener properly
但是如果我先启动我的应用程序然后再启动服务器我会得到完美的响应,因为我正在连接到服务器 我无法重新启动它?我的应用程序应该连接到已经启动的服务器?可能是什么问题呢 ?
使用的版本:
Spring 集成版本:4.3.1
Spring 版本:4.3.2
JDK 1.8 JBOSS EAP 7
该 WARN 消息意味着,不知何故,在没有 TcpReceivingChannelAdapter
向连接工厂注册的情况下收到了入站消息。客户端模式应该没有区别。
仔细查看了您的代码,原型 bean 应该没问题,只要您使用这些对象(尤其是 TcpMessageHandler
直接使用而不是通过框架)。
根据您的配置,我不太清楚这是怎么发生的;当您在接收适配器上调用 setConnectionFactory
时,侦听器已注册。
如果你可以用 trimmed-down 项目和 post 某个地方重现它,我会看看。