Spring 集成路由无法发送消息
Spring integration route can't dispatch message
所以,问题出在路由器上。
当路由器尝试向频道发送消息时,我收到错误消息:Dispatcher 没有频道 'newTypingNotificationHandler.input' 的订阅者。但是我有这个频道名称的 integrationFlow defenition。
@Bean
public IntegrationFlow routeEcmIncomeRq(AbstractMessageRouter typeNotificationRouter) {
return IntegrationFlows.from(FROM_CHANNEL)
.routeToRecipients(r -> r
.recipientFlow(p -> p instanceof TypingNotificationDto,
f -> f.route(typeNotificationRouter)
)
.defaultOutputChannel(DEFAULT_SERVICE_CHANNEL)
).get();
}
@Bean
public AbstractMessageRouter typeNotificationRouter(IncomeRepository incomeRepository) {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
TypingNotificationDto messagePayload = (TypingNotificationDto) message.getPayload();
if (!incomeRepository.existsById(StringUtil.ecdFileIdToUuid(messagePayload.getEcdDocumentImage().getDocumentSource()))) {
return Collections.singleton(MessageChannels.direct("newTypingNotificationHandler.input").get());
} else {
return Collections.singleton(MessageChannels.direct("existsTypingNotificationHandler.input").get());
}
}
};
}
@Bean
public IntegrationFlow newTypingNotificationHandler() {
return f -> f.log("need's create new Income");
}
@Bean
public IntegrationFlow existsTypingNotificationHandler() {
return f -> f.log("exist income process");
}
stackTrace 原因:
org.springframework.integration.MessageDispatchingException:
Dispatcher has no subscribers at
org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:461)
~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at
org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:206)
~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
您每次返回一个新频道,而不是 Spring
管理的频道
MessageChannels.direct("newTypingNotificationHandler.input").get();
使用
return Collections.singleton(getChannelResolver().resolveDestination("newTypingNotificationHandler.input"));
相反。但是,最好在解析后缓存而不是为每条消息返回一个新集合。
所以,问题出在路由器上。 当路由器尝试向频道发送消息时,我收到错误消息:Dispatcher 没有频道 'newTypingNotificationHandler.input' 的订阅者。但是我有这个频道名称的 integrationFlow defenition。
@Bean
public IntegrationFlow routeEcmIncomeRq(AbstractMessageRouter typeNotificationRouter) {
return IntegrationFlows.from(FROM_CHANNEL)
.routeToRecipients(r -> r
.recipientFlow(p -> p instanceof TypingNotificationDto,
f -> f.route(typeNotificationRouter)
)
.defaultOutputChannel(DEFAULT_SERVICE_CHANNEL)
).get();
}
@Bean
public AbstractMessageRouter typeNotificationRouter(IncomeRepository incomeRepository) {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
TypingNotificationDto messagePayload = (TypingNotificationDto) message.getPayload();
if (!incomeRepository.existsById(StringUtil.ecdFileIdToUuid(messagePayload.getEcdDocumentImage().getDocumentSource()))) {
return Collections.singleton(MessageChannels.direct("newTypingNotificationHandler.input").get());
} else {
return Collections.singleton(MessageChannels.direct("existsTypingNotificationHandler.input").get());
}
}
};
}
@Bean
public IntegrationFlow newTypingNotificationHandler() {
return f -> f.log("need's create new Income");
}
@Bean
public IntegrationFlow existsTypingNotificationHandler() {
return f -> f.log("exist income process");
}
stackTrace 原因:
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:461) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:206) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
您每次返回一个新频道,而不是 Spring
管理的频道MessageChannels.direct("newTypingNotificationHandler.input").get();
使用
return Collections.singleton(getChannelResolver().resolveDestination("newTypingNotificationHandler.input"));
相反。但是,最好在解析后缓存而不是为每条消息返回一个新集合。