收到消息时 MessageDeliveryException: Dispatcher has no subscribers
When receiving message MessageDeliveryException: Dispatcher has no subscribers
更新:
在将 fromTcp() 从持有者 class 移动到端点 class 后,它再次运行良好,现在我很困惑为什么它现在还能工作,因为对我来说它为什么工作没有意义
经过一些头痛并在所有 Whosebug 中搜索后,当我收到来自服务器的消息时,我发现调度程序没有订阅者,但当我发回响应时一切正常。
连接生成器:
@Service
@EnableIntegration
public class TcpConnectionsHolder {
@Autowired
private IntegrationFlowContext flowContext;
/**
* Definition of flow channels
*
* @return MessageChannel
*/
@Bean
public MessageChannel fromTcp() {
final DirectChannel channel = new DirectChannel();
channel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
// Parse Message byte[] to StringHex
final byte[] bMessagePayload = (byte[]) message.getPayload();
return MessageBuilder.withPayload(Hex.encodeHexString(bMessagePayload))
.copyHeaders(message.getHeaders()).build();
}
});
return channel;
}
private final LinkedHashMap<String, TcpNetClientConnectionFactory> clientConnect =
new LinkedHashMap<String, TcpNetClientConnectionFactory>();
private final LinkedHashMap<String, TcpReceivingChannelAdapter> reciverAdapter =
new LinkedHashMap<String, TcpReceivingChannelAdapter>();
private final LinkedHashMap<String, MessageChannel> sendingAdpater =
new LinkedHashMap<String, MessageChannel>();
public MessageChannel getMessageChannel(String host, int port) {
return sendingAdpater.get(host+port);
}
public TcpReceivingChannelAdapter getReceiverChannel(String host, int port) {
return reciverAdapter.get(host+port);
}
private TcpNetClientConnectionFactory getclientConnectionFactory(String host, int port,int headBytes) {
TcpNetClientConnectionFactory cf = clientConnect.get(host+port);
if(cf==null) {
cf = new TcpNetClientConnectionFactory(host, port);
final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(by);
cf.setDeserializer(by);
clientConnect.put(host+port,cf);
}
return cf;
}
public TcpReceivingChannelAdapter addReceiverChannel(String host, int port) {
return addReceiverChannel(host,port,2,2000);
}
public TcpReceivingChannelAdapter addReceiverChannel(String host, int port,int headBytes,int retryInterval) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
TcpNetClientConnectionFactory cf = getclientConnectionFactory(host,port,headBytes);
adapter.setConnectionFactory(cf);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
IntegrationFlow flow = IntegrationFlows.from(adapter).get();
this.flowContext.registration(flow).id(host+port + ".in").addBean(cf).register();
this.reciverAdapter.put(host+port, adapter);
return adapter;
}
public MessageChannel addSendingChannel(String host, int port) {
return addSendingChannel(host,port,2);
}
public MessageChannel addSendingChannel(String host, int port,int headBytes) {
TcpSendingMessageHandler sender = new TcpSendingMessageHandler();
sender.setConnectionFactory(getclientConnectionFactory(host,port,headBytes));
IntegrationFlow flow = f -> f.handle(sender);
IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow).id(host+port + ".out").register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.sendingAdpater.put(host+port, inputChannel);
return inputChannel;
}
public void removeReceiverChannel(String host, int port) {
this.reciverAdapter.remove(host+port);
this.flowContext.remove(host+port + ".in");
}
public void removeSendingChannel(String host, int port) {
this.sendingAdpater.remove(host+port);
this.flowContext.remove(host+port + ".out");
}
}
消息端点:
@Configuration
@MessageEndpoint
public class BridgeMessageEndpoint {
private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMessageEndpoint.class);
@Autowired
private ApplicationContext applicationContext;
@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}
private void sendToApi(final String inMessage, final Map<String, Object> headerMap) {
LOGGER.debug("Recuperando el mensaje Hex {}", inMessage);
final PaymentOrder paymentOrder = new PaymentOrder();
paymentOrder.setMessage(inMessage);
final SplitterRestClient splitterRestClient = applicationContext.getBean(SplitterRestClient.class);
splitterRestClient.reportPayment(paymentOrder, headerMap);
}
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler logger() {
final LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.DEBUG.name());
loggingHandler.setLoggerName("Log");
return loggingHandler;
}
@Bean
public IntegrationFlow toTcp() {
return f -> f.route(new TcpRouter());
}
}
然后发起持有者:
@Component
public class TcpConnectionsController implements CommandLineRunner{
@Autowired
private TcpConnectionsHolder holder;
@Autowired
private ListNodeConfig listNodes;
@Value("${socket.tcp.headBytes}")
private int headBytes;
@Value("${socket.tcp.retryInterval}")
private int retryInterval;
@Override
public void run(String... args) throws Exception {
for(Node node:listNodes.getNodes()) {
holder.addReceiverChannel(node.getIp(), node.getPort(),headBytes,retryInterval);
holder.addSendingChannel(node.getIp(), node.getPort(),headBytes);
}
}
}
我知道问题出在 fromTcp() 中,但尽管我搜索了很多,但我无法看出问题或错误所在。
编辑
完整的堆栈跟踪:
[INFO ] (DefaultLifecycleProcessor.java:343) org.springframework.context.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2018/06/22 13:56:47,454 [INFO ] (DocumentationPluginsBootstrapper.java:151) springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper : Context refreshed
2018/06/22 13:56:47,504 [INFO ] (DocumentationPluginsBootstrapper.java:154) springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2018/06/22 13:56:47,555 [INFO ] (ApiListingReferenceScanner.java:41) springfox.documentation.spring.web.scanners.ApiListingReferenceScanner : Scanning for api listing references
2018/06/22 13:56:47,826 [INFO ] (DirectJDKLog.java:180) org.apache.coyote.http11.Http11NioProtocol : Initializing ProtocolHandler ["http-nio-8080"]
2018/06/22 13:56:47,827 [INFO ] (DirectJDKLog.java:180) org.apache.coyote.http11.Http11NioProtocol : Starting ProtocolHandler ["http-nio-8080"]
2018/06/22 13:56:47,829 [INFO ] (DirectJDKLog.java:180) org.apache.tomcat.util.net.NioSelectorPool : Using a shared selector for servlet write/read
2018/06/22 13:56:47,836 [INFO ] (TomcatEmbeddedServletContainer.java:201) org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2018/06/22 13:56:47,902 [INFO ] (AbstractConnectionFactory.java:481) org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory : started 180.112.19.1153115.inorg.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory#0, host=180.112.19.115, port=3115
2018/06/22 13:56:47,922 [INFO ] (AbstractEndpoint.java:120) org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#0
2018/06/22 13:56:47,949 [INFO ] (EventDrivenConsumer.java:108) org.springframework.integration.endpoint.EventDrivenConsumer : Adding {ip:tcp-outbound-channel-adapter} as a subscriber to the '180.112.19.1153115.out.input' channel
2018/06/22 13:56:47,950 [INFO ] (AbstractSubscribableChannel.java:81) org.springframework.integration.channel.DirectChannel : Channel 'ck-da-bridge:local:8080.180.112.19.1153115.out.input' has 1 subscriber(s).
2018/06/22 13:56:47,950 [INFO ] (AbstractConnectionFactory.java:481) org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory : started 180.112.19.1153115.inorg.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory#0, host=180.112.19.115, port=3115
2018/06/22 13:56:47,950 [INFO ] (AbstractEndpoint.java:120) org.springframework.integration.endpoint.EventDrivenConsumer : started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2018/06/22 13:56:47,953 [INFO ] (StartupInfoLogger.java:57) com.santander.ck.bridge.spring.Application : Started Application in 13.078 seconds (JVM running for 14.159)
2018/06/22 13:57:01,164 [ERROR ] (LoggingHandler.java:192) org.springframework.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'ck-da-bridge:local:8080.180.112.19.1153115.in.channel#0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=f0f8f0f0c2200000800000020400000000000000f0f5f9f9f1f0f3f0f6f2f2f1f3f5f6f5f5f0f0f0f0f1f2f0f6f0f2f2f0f2f0f0f0f9d4c3c3f0f1f1f1e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}], failedMessage=GenericMessage [payload=f0f000022f0f6e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter.onMessage(TcpReceivingChannelAdapter.java:88)
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=f0f8f0f0c2200000800000020400000000000000f0f5f9f9f1f0f3f0f6f2f2f1f3f5f6f5f5f0f0f0f0f1f2f0f6f0f2f2f0f2f0f0f0f9d4c3c3f0f1f1f1e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
... 11 more
您可以为 org.springframework.integration
类别启用 DEBUG 日志记录级别并跟踪消息的传输方式。
还有一个 Message History 模式供您跟踪 headers 中的消息路径。通过这种方式,您将能够看到您的消息已通过哪些渠道,以及它在何处卡住了 Dispatcher has no subscribers
。
尽管您需要在每次 IntegrationFlow
注册后重新启动 MessageHistoryConfigurer
。
欢迎就此事提出 JIRA,让消息历史记录跟踪动态流。
更新
好的。拥有这样的简单代码:
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
TcpNetClientConnectionFactory cf = getclientConnectionFactory(host,port,headBytes);
adapter.setConnectionFactory(cf);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
IntegrationFlow flow = IntegrationFlows.from(adapter).get();
并且在这个 handler
之后没有更多的组件这一事实(adn 堆栈跟踪确实证实了这一点),我假设您使用了一些 Spring 集成 Java DSL 版本而没有从提供的 MessageProducer.getOutputChannel()
.
中提取 outputChannel
的正确修复
为了解决这个问题,我建议您像这样修改代码:
IntegrationFlow flow = IntegrationFlows.from(adapter).channel(fromTcp()).get();
我的意思是将通道引用从 TcpReceivingChannelAdapter
定义移动到流程中。或者只是尝试将 spring-integration-java-dsl
依赖项升级到最新的 1.2.3
版本!
更新 2
关于我们目前已有的代码的另一个想法。
您从 CommandLineRunner
注册了一个 TcpReceivingChannelAdapter
。这样 IntegrationFlow
会自动启动并准备好通过 TCP 接收数据,但同时 @ServiceActivator(inputChannel = "fromTcp")
尚未启动以使用来自上述通道的消息。这就是您获得 Dispatcher has no subscribers
的方式。或者你应该用 autoStartup(false)
破坏你的 this.flowContext.registration(flow)
并让它在正常的 auto-startup 阶段最近启动,已经与服务激活器一起。或者你不应该对这样的逻辑使用 CommandLineRunner
方法:从那里开始一些 activity.
真的太早了
更新: 在将 fromTcp() 从持有者 class 移动到端点 class 后,它再次运行良好,现在我很困惑为什么它现在还能工作,因为对我来说它为什么工作没有意义
经过一些头痛并在所有 Whosebug 中搜索后,当我收到来自服务器的消息时,我发现调度程序没有订阅者,但当我发回响应时一切正常。
连接生成器:
@Service
@EnableIntegration
public class TcpConnectionsHolder {
@Autowired
private IntegrationFlowContext flowContext;
/**
* Definition of flow channels
*
* @return MessageChannel
*/
@Bean
public MessageChannel fromTcp() {
final DirectChannel channel = new DirectChannel();
channel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
// Parse Message byte[] to StringHex
final byte[] bMessagePayload = (byte[]) message.getPayload();
return MessageBuilder.withPayload(Hex.encodeHexString(bMessagePayload))
.copyHeaders(message.getHeaders()).build();
}
});
return channel;
}
private final LinkedHashMap<String, TcpNetClientConnectionFactory> clientConnect =
new LinkedHashMap<String, TcpNetClientConnectionFactory>();
private final LinkedHashMap<String, TcpReceivingChannelAdapter> reciverAdapter =
new LinkedHashMap<String, TcpReceivingChannelAdapter>();
private final LinkedHashMap<String, MessageChannel> sendingAdpater =
new LinkedHashMap<String, MessageChannel>();
public MessageChannel getMessageChannel(String host, int port) {
return sendingAdpater.get(host+port);
}
public TcpReceivingChannelAdapter getReceiverChannel(String host, int port) {
return reciverAdapter.get(host+port);
}
private TcpNetClientConnectionFactory getclientConnectionFactory(String host, int port,int headBytes) {
TcpNetClientConnectionFactory cf = clientConnect.get(host+port);
if(cf==null) {
cf = new TcpNetClientConnectionFactory(host, port);
final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(by);
cf.setDeserializer(by);
clientConnect.put(host+port,cf);
}
return cf;
}
public TcpReceivingChannelAdapter addReceiverChannel(String host, int port) {
return addReceiverChannel(host,port,2,2000);
}
public TcpReceivingChannelAdapter addReceiverChannel(String host, int port,int headBytes,int retryInterval) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
TcpNetClientConnectionFactory cf = getclientConnectionFactory(host,port,headBytes);
adapter.setConnectionFactory(cf);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
IntegrationFlow flow = IntegrationFlows.from(adapter).get();
this.flowContext.registration(flow).id(host+port + ".in").addBean(cf).register();
this.reciverAdapter.put(host+port, adapter);
return adapter;
}
public MessageChannel addSendingChannel(String host, int port) {
return addSendingChannel(host,port,2);
}
public MessageChannel addSendingChannel(String host, int port,int headBytes) {
TcpSendingMessageHandler sender = new TcpSendingMessageHandler();
sender.setConnectionFactory(getclientConnectionFactory(host,port,headBytes));
IntegrationFlow flow = f -> f.handle(sender);
IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow).id(host+port + ".out").register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.sendingAdpater.put(host+port, inputChannel);
return inputChannel;
}
public void removeReceiverChannel(String host, int port) {
this.reciverAdapter.remove(host+port);
this.flowContext.remove(host+port + ".in");
}
public void removeSendingChannel(String host, int port) {
this.sendingAdpater.remove(host+port);
this.flowContext.remove(host+port + ".out");
}
}
消息端点:
@Configuration
@MessageEndpoint
public class BridgeMessageEndpoint {
private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMessageEndpoint.class);
@Autowired
private ApplicationContext applicationContext;
@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
sendToApi(inMessage, headerMap);
}
private void sendToApi(final String inMessage, final Map<String, Object> headerMap) {
LOGGER.debug("Recuperando el mensaje Hex {}", inMessage);
final PaymentOrder paymentOrder = new PaymentOrder();
paymentOrder.setMessage(inMessage);
final SplitterRestClient splitterRestClient = applicationContext.getBean(SplitterRestClient.class);
splitterRestClient.reportPayment(paymentOrder, headerMap);
}
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler logger() {
final LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.DEBUG.name());
loggingHandler.setLoggerName("Log");
return loggingHandler;
}
@Bean
public IntegrationFlow toTcp() {
return f -> f.route(new TcpRouter());
}
}
然后发起持有者:
@Component
public class TcpConnectionsController implements CommandLineRunner{
@Autowired
private TcpConnectionsHolder holder;
@Autowired
private ListNodeConfig listNodes;
@Value("${socket.tcp.headBytes}")
private int headBytes;
@Value("${socket.tcp.retryInterval}")
private int retryInterval;
@Override
public void run(String... args) throws Exception {
for(Node node:listNodes.getNodes()) {
holder.addReceiverChannel(node.getIp(), node.getPort(),headBytes,retryInterval);
holder.addSendingChannel(node.getIp(), node.getPort(),headBytes);
}
}
}
我知道问题出在 fromTcp() 中,但尽管我搜索了很多,但我无法看出问题或错误所在。
编辑 完整的堆栈跟踪:
[INFO ] (DefaultLifecycleProcessor.java:343) org.springframework.context.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2018/06/22 13:56:47,454 [INFO ] (DocumentationPluginsBootstrapper.java:151) springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper : Context refreshed
2018/06/22 13:56:47,504 [INFO ] (DocumentationPluginsBootstrapper.java:154) springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2018/06/22 13:56:47,555 [INFO ] (ApiListingReferenceScanner.java:41) springfox.documentation.spring.web.scanners.ApiListingReferenceScanner : Scanning for api listing references
2018/06/22 13:56:47,826 [INFO ] (DirectJDKLog.java:180) org.apache.coyote.http11.Http11NioProtocol : Initializing ProtocolHandler ["http-nio-8080"]
2018/06/22 13:56:47,827 [INFO ] (DirectJDKLog.java:180) org.apache.coyote.http11.Http11NioProtocol : Starting ProtocolHandler ["http-nio-8080"]
2018/06/22 13:56:47,829 [INFO ] (DirectJDKLog.java:180) org.apache.tomcat.util.net.NioSelectorPool : Using a shared selector for servlet write/read
2018/06/22 13:56:47,836 [INFO ] (TomcatEmbeddedServletContainer.java:201) org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2018/06/22 13:56:47,902 [INFO ] (AbstractConnectionFactory.java:481) org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory : started 180.112.19.1153115.inorg.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory#0, host=180.112.19.115, port=3115
2018/06/22 13:56:47,922 [INFO ] (AbstractEndpoint.java:120) org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#0
2018/06/22 13:56:47,949 [INFO ] (EventDrivenConsumer.java:108) org.springframework.integration.endpoint.EventDrivenConsumer : Adding {ip:tcp-outbound-channel-adapter} as a subscriber to the '180.112.19.1153115.out.input' channel
2018/06/22 13:56:47,950 [INFO ] (AbstractSubscribableChannel.java:81) org.springframework.integration.channel.DirectChannel : Channel 'ck-da-bridge:local:8080.180.112.19.1153115.out.input' has 1 subscriber(s).
2018/06/22 13:56:47,950 [INFO ] (AbstractConnectionFactory.java:481) org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory : started 180.112.19.1153115.inorg.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory#0, host=180.112.19.115, port=3115
2018/06/22 13:56:47,950 [INFO ] (AbstractEndpoint.java:120) org.springframework.integration.endpoint.EventDrivenConsumer : started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2018/06/22 13:56:47,953 [INFO ] (StartupInfoLogger.java:57) com.santander.ck.bridge.spring.Application : Started Application in 13.078 seconds (JVM running for 14.159)
2018/06/22 13:57:01,164 [ERROR ] (LoggingHandler.java:192) org.springframework.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'ck-da-bridge:local:8080.180.112.19.1153115.in.channel#0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=f0f8f0f0c2200000800000020400000000000000f0f5f9f9f1f0f3f0f6f2f2f1f3f5f6f5f5f0f0f0f0f1f2f0f6f0f2f2f0f2f0f0f0f9d4c3c3f0f1f1f1e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}], failedMessage=GenericMessage [payload=f0f000022f0f6e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter.onMessage(TcpReceivingChannelAdapter.java:88)
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=f0f8f0f0c2200000800000020400000000000000f0f5f9f9f1f0f3f0f6f2f2f1f3f5f6f5f5f0f0f0f0f1f2f0f6f0f2f2f0f2f0f0f0f9d4c3c3f0f1f1f1e3f0f0f8f2, headers={ip_tcp_remotePort=3115, ip_connectionId=amm012059amm.scisb.isban.corp:3115:53402:3722e84f-49ce-4e77-a4a1-6e82cefe58b4, ip_localInetAddress=/180.101.130.119, ip_address=180.112.19.115, id=2c6e2e09-9faa-a56e-3921-edf7a03a9a3c, ip_hostname=amm012059amm.scisb.isban.corp, timestamp=1529668621142}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
... 11 more
您可以为 org.springframework.integration
类别启用 DEBUG 日志记录级别并跟踪消息的传输方式。
还有一个 Message History 模式供您跟踪 headers 中的消息路径。通过这种方式,您将能够看到您的消息已通过哪些渠道,以及它在何处卡住了 Dispatcher has no subscribers
。
尽管您需要在每次 IntegrationFlow
注册后重新启动 MessageHistoryConfigurer
。
欢迎就此事提出 JIRA,让消息历史记录跟踪动态流。
更新
好的。拥有这样的简单代码:
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
TcpNetClientConnectionFactory cf = getclientConnectionFactory(host,port,headBytes);
adapter.setConnectionFactory(cf);
adapter.setClientMode(true);
adapter.setErrorChannelName("errorChannel");
adapter.setRetryInterval(retryInterval);
adapter.setOutputChannel(fromTcp());
IntegrationFlow flow = IntegrationFlows.from(adapter).get();
并且在这个 handler
之后没有更多的组件这一事实(adn 堆栈跟踪确实证实了这一点),我假设您使用了一些 Spring 集成 Java DSL 版本而没有从提供的 MessageProducer.getOutputChannel()
.
outputChannel
的正确修复
为了解决这个问题,我建议您像这样修改代码:
IntegrationFlow flow = IntegrationFlows.from(adapter).channel(fromTcp()).get();
我的意思是将通道引用从 TcpReceivingChannelAdapter
定义移动到流程中。或者只是尝试将 spring-integration-java-dsl
依赖项升级到最新的 1.2.3
版本!
更新 2
关于我们目前已有的代码的另一个想法。
您从 CommandLineRunner
注册了一个 TcpReceivingChannelAdapter
。这样 IntegrationFlow
会自动启动并准备好通过 TCP 接收数据,但同时 @ServiceActivator(inputChannel = "fromTcp")
尚未启动以使用来自上述通道的消息。这就是您获得 Dispatcher has no subscribers
的方式。或者你应该用 autoStartup(false)
破坏你的 this.flowContext.registration(flow)
并让它在正常的 auto-startup 阶段最近启动,已经与服务激活器一起。或者你不应该对这样的逻辑使用 CommandLineRunner
方法:从那里开始一些 activity.