意外消息 - 与多个服务器通信时没有端点注册到连接拦截器

Unexpected message - no endpoint registered with connection interceptor while communicating with multiple servers

    @Configuration
@Component
public class GatewayAqrConfig {

    @Autowired
    ConnectorService connectorService;

    @Autowired
    MasterService masterService;

    private HashMap<ConnectorPK, GatewayAqr> connectorMap;

    @Bean
    @Scope(value = "prototype")
    public AbstractClientConnectionFactory clientCF(Connector connector , Master master) {
        TcpNetClientConnectionFactory clientConnectionFactory = new TcpNetClientConnectionFactory(connector.getAqrIpAddr(), connector.getAqrIpPortNo());
        clientConnectionFactory.setSingleUse(false);        
        MyByteArraySerializer obj = new MyByteArraySerializer(master.getAqrMsgHeaderLength(), master.getAqrId());
        clientConnectionFactory.setSerializer(obj);
        clientConnectionFactory.setDeserializer(obj);       
        clientConnectionFactory.setSoKeepAlive(true);
        TcpMessageMapper tcpMessageMapper = new TcpMessageMapper();
        tcpMessageMapper.setCharset("ISO-8859-1");
        clientConnectionFactory.setMapper(tcpMessageMapper);        
        clientConnectionFactory.setBeanName(connector.getAqrIpAddr() + ":" + connector.getAqrIpPortNo());               
        clientConnectionFactory.afterPropertiesSet();
        clientConnectionFactory.start();        
        return clientConnectionFactory;
    }

    @Bean
    @Scope(value = "prototype")
    public TcpSendingMessageHandler tcpOutGateway(AbstractClientConnectionFactory connectionFactory) {      
        TcpSendingMessageHandler messageHandler = new TcpSendingMessageHandler();
        messageHandler.setConnectionFactory(connectionFactory);     
        messageHandler.setClientMode(true);
        messageHandler.setTaskScheduler(getTaskScheduler());
        messageHandler.setStatsEnabled(true);
        messageHandler.afterPropertiesSet();
        messageHandler.start();     
        return messageHandler;
    }

    @Bean
    @Scope(value = "prototype")
    public TcpReceivingChannelAdapter tcpInGateway(AbstractClientConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter messageHandler = new TcpReceivingChannelAdapter();
        messageHandler.setConnectionFactory(connectionFactory);             
        messageHandler.setClientMode(true);
        messageHandler.setOutputChannel(receive());
        messageHandler.setAutoStartup(true);
        messageHandler.setTaskScheduler(getTaskScheduler());
        messageHandler.afterPropertiesSet();
        messageHandler.start();
        return messageHandler;
    }

    @Bean
    @Scope(value = "prototype")
    public TaskScheduler getTaskScheduler() {
        TaskScheduler ts = new ThreadPoolTaskScheduler();
        return ts;
    }

    @Bean
    public MessageChannel receive() {
        QueueChannel channel = new QueueChannel();      
        return channel;
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() { 
        return new PollerMetadata();
    }

    @Bean
    @Transactional
    public HashMap<ConnectorPK, GatewayAqr> gatewayAqr() throws Exception {
        connectorMap = new HashMap();
        Connector connector = null;
        ConnectorPK connectorPK = null;
        Master master = null;
        TcpConnectionSupport connectionSupport = null;

        // 1. Get List of Connections configured in Database
        List<Connector> connectors = connectorService.getConnections();

        if (connectors.size() > 0) {
            for (int i = 0; i < connectors.size(); i++) {

                // 2. Get the connection details
                connector = connectors.get(i);
                connectorPK = aqrConnector.getConnectorpk();
                master = masterService.findById(connectorPK.getAcuirerId());                

                try {
                    // 3. Create object of TcpNetClientConnectionFactory for each Acquirer connection
                    AbstractClientConnectionFactory clientConnectionFactory = clientCF(aqrConnector, aqrMaster);                    

                    // 4. Create TcpSendingMessageHandler for the Connection
                    TcpSendingMessageHandler outHandler = tcpOutGateway(clientConnectionFactory);                   

                    // 5. Create TcpReceivingChannelAdapter object for the Connection and assign it to receive channel
                    TcpReceivingChannelAdapter inHandler = tcpInGateway(clientConnectionFactory);

                    // 6. Generate the GatewayAqr object
                    GatewayAqr gatewayAqr = new GatewayAqr(clientConnectionFactory, outHandler, inHandler);

                    // 7. Put in the MAP acuirerPK and Send MessageHandler object
                    connectorMap.put(aqrConnectorPK, gatewayAquirer);                   
                } catch (Exception e) {
                }

            } // for
        } // if
        return connectorMap;
    }
}

*********************************************************************************************************************************
@EnableIntegration
@IntegrationComponentScan(basePackageClasses = {GatewayEventConfig.class,GatewayAqrConfig.class })
@Configuration
@ComponentScan(basePackages = {"com.iz.zw.gateway.impl", "com.iz.zw.configuration"})
@Import({GatewayEventConfig.class,GatewayAquirerConfig.class})
public class GatewayConfig {

    @Autowired
    private GatewayAsyncReply<Object, Message<?>> gatewayAsyncReply;

    @Autowired
    private GatewayCorrelationStrategy gatewayCorrelationStrategy;

    @Autowired
    private HashMap<ConnectorPK, GatewayAqr> gatewayAqrs;

    @Autowired
    ConnectorService connectorService;

    @Autowired
    GatewayResponseDeserializer gatewayResponseDeserializer;    

    @MessagingGateway(defaultRequestChannel = "send")
    public interface Gateway {
        void waitForResponse(TransactionMessage transaction);
    }

    @Bean
    public MessageChannel send() {
        DirectChannel channel = new DirectChannel();        
        return channel;
    }

    @Bean
    @ServiceActivator(inputChannel = "send")
    public BarrierMessageHandlerWithLateGoodResponse barrier() {
        BarrierMessageHandlerWithLateGoodResponse barrier = new BarrierMessageHandlerWithLateGoodResponse(25000, this.gatewayCorrelationStrategy);      
        barrier.setAsync(true);
        barrier.setOutputChannel(out());
        barrier.setDiscardChannel(lateGoodresponseChannel());
        return barrier;
    }

    @ServiceActivator(inputChannel = "out")
    public void printMessage(Message<?> message) {
        System.out.println("in out channel");
    }

    @Transformer(inputChannel = "receive", outputChannel = "process")
    public TransactionMessage convert(byte[] response) {    

        logger.debug("Response Received", Arrays.toString(response));
        TransactionMessage transactionMessage = gatewayResponseDeserializer.deserializeResponse(response);      
        System.out.println("Response : " + response);       
        return transactionMessage;
    }

    @ServiceActivator(inputChannel = "process")
    @Bean
    public MessageHandler releaser() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                try {               
                    gatewayAsyncReply.put(message);             
                    barrier().trigger(message);
                } catch (GatewayLateGoodMessageException exception) {                   
                    System.out.println("Late good response..!");
                    gatewayAsyncReply.get(message);
                    lateGoodresponseChannel().send(message);
                }                               
            }
        };
    }   

    @Bean
    public MessageChannel process() {   
        QueueChannel channel = new QueueChannel();
        return channel;
    }

    @Bean
    public MessageChannel out() {
        DirectChannel channel = new DirectChannel();        
        return channel;
    }

    @Bean
    public MessageChannel lateGoodresponseChannel() {
        QueueChannel channel = new QueueChannel();
        return channel;
    }

    @ServiceActivator(inputChannel="lateGoodresponseChannel")
    public void handleLateGoodResponse(Message<?> message) {
        String strSTAN = null;
        String strResponse = null;
        Message<?> respMessage = null;

        if(message instanceof TransactionMessage){
            strSTAN = ((TransactionMessage)message).getStan();
            respMessage = gatewayAsyncReply.get(strSTAN);

            if (null != respMessage) {              
                strResponse = (String) message.getPayload();                
            }
        }               
        logger.info("Late Good Response: " + strResponse);      
    }
}

*********************************************************************************************************************************

@Configuration
public class GatewayEventConfig {

    private static final Logger logger = LoggerFactory.getLogger(GatewayEventConfig.class);

    @Bean
    public ApplicationEventListeningMessageProducer tcpEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(new Class[] {TcpConnectionOpenEvent.class, TcpConnectionCloseEvent.class, TcpConnectionExceptionEvent.class});
        producer.setOutputChannel(tcpEventChannel());
        producer.setAutoStartup(true);
        producer.setTaskScheduler(getEventTaskScheduler());
        producer.start();
        return producer;
    }

    @Bean
    public TaskScheduler getEventTaskScheduler() {
        TaskScheduler ts = new ThreadPoolTaskScheduler();
        return ts;
    }

    @Bean
    public MessageChannel tcpEventChannel() {
        return new QueueChannel();
    }

    @Transactional
    @ServiceActivator(inputChannel = "tcpEventChannel")
    public void tcpConnectionEvent(TcpConnectionEvent event) {

        System.out.println("In publishing" + event.toString());
        String strConnectionFactory = event.getConnectionFactoryName();

        if (strConnectionFactory.equals("connection1")) {            
                //send some message to connector            
        } else {
            // send message to another connector
        }
    }
}

这是我的配置文件,我的应用程序一启动就尝试连接到 2 个服务器。 如上所述,我为 2 个服务器做了 2 个配置 class GatewayAqrConfig1 和 GatewayConfig1 classes 用于第一个服务器连接 GatewayAqrConfig2 和 GatewayConfig2 classes 用于第二个服务器连接 使用事件我正在连接到服务器并发送连接设置消息,如果服务器已经启动并且如果我已经启动了我的应用程序, 它获取事件、连接并发送消息,但我没有收到响应,而是收到如下警告

**WARN  TcpNetConnection:186 - Unexpected message - no endpoint registered with connection interceptor:**
    i.e connection does not registers the listener properly

但是如果我先启动我的应用程序然后再启动服务器我会得到完美的响应,因为我正在连接到服务器 我无法重新启动它?我的应用程序应该连接到已经启动的服务器?可能是什么问题呢 ?

使用的版本:

Spring 集成版本:4.3.1

Spring 版本:4.3.2

JDK 1.8 JBOSS EAP 7

该 WARN 消息意味着,不知何故,在没有 TcpReceivingChannelAdapter 向连接工厂注册的情况下收到了入站消息。客户端模式应该没有区别。

仔细查看了您的代码,原型 bean 应该没问题,只要您使用这些对象(尤其是 TcpMessageHandler 直接使用而不是通过框架)。

根据您的配置,我不太清楚这是怎么发生的;当您在接收适配器上调用 setConnectionFactory 时,侦听器已注册。

如果你可以用 trimmed-down 项目和 post 某个地方重现它,我会看看。