Spring 集成 DSL:Dispatcher 没有订阅者
Spring Integration DSL: Dispatcher has no subscribers
我需要使用 SFTP 接收 zip 文件。我们应该按原样存档文件,并在解压缩 zip 文件后处理文件。下面是主流程和子流程的代码。 agentDataArchiveChannelAdapter() 工作正常,但对于另一个通道,我遇到了以下错误。这可能是什么错误,如何解决?我的假设是 surancebayAgentDemographicFlow() 会将记录放入直接通道,并且它将遵循所述过程。
Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]
2018-09-24 12:16:22.004 DEBUG 17536 --- [ask-scheduler-2] o.s.i.channel.PublishSubscribeChannel : postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.thirdpartyAgentDemographicFlow-Processing'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], headers={id=9e342354-8436-e1de-774e-937c8b6809d5, timestamp=1537816582001}] for original GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]
Code/Integration流量
@Bean("sftpAgentInboundFlow")
public IntegrationFlow sftpAgentInboundFlow(SessionFactory<LsEntry> sftpSessionFactory) {
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory)
.deleteRemoteFiles(false)
.preserveTimestamp(true)
.remoteDirectory(agentRemoteDir)
.filter(new AcceptOnceFileListFilter<>())
.regexFilter(".*\.zip$")
.localDirectory(new File(inputDir))
.autoCreateLocalDirectory(true)
.maxFetchSize(1)
,
consumer -> consumer.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
.publishSubscribeChannel(pubSub -> pubSub
.id("AgentInboundDemographic-PubSub-Channel")
.subscribe(flow -> flow.bridge(e -> e.id("ziparchiver")).handle(agentDataArchiveChannelAdapter()))
.subscribe(surancebayAgentDemographicFlow())
)
.get();
}
//@Bean("surancebayAgentDemographicFlow")
public IntegrationFlow surancebayAgentDemographicFlow() {
return IntegrationFlows
//.from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)/*.maxMessagesPerPoll(corepoolsize)*/))
.from(MessageChannels.direct("thirdpartyAgentDemographicFlow-Processing"))
.transform(unZipTransformer())
.split(splitter())
.channel(MessageChannels.executor(taskExecutor()))
.<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
.aggregate(aggregator -> aggregator.groupTimeout(messageGroupWaiting).correlationStrategy(new CorrelationStrategy() {
@Override
public Object getCorrelationKey(Message<?> message) {
return "processdate";
}
}).sendPartialResultOnExpiry(true))
.handle("agentDemograpicOutput","generateAgentDemographicFile")
.channel(confirmChannel())
.get()
;
}
好的!我认为您使用某些 Spring 集成版本的问题尚未实现,其中使用外部 IntegrationFlow
作为子流的功能尚未实现。或者考虑升级到最新版本或使用变通方法 .subscribe("thirdpartyAgentDemographicFlow-Processing")
并取消注释 surancebayAgentDemographicFlow
定义中的 @Bean
注释。
我需要使用 SFTP 接收 zip 文件。我们应该按原样存档文件,并在解压缩 zip 文件后处理文件。下面是主流程和子流程的代码。 agentDataArchiveChannelAdapter() 工作正常,但对于另一个通道,我遇到了以下错误。这可能是什么错误,如何解决?我的假设是 surancebayAgentDemographicFlow() 会将记录放入直接通道,并且它将遵循所述过程。
Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]
2018-09-24 12:16:22.004 DEBUG 17536 --- [ask-scheduler-2] o.s.i.channel.PublishSubscribeChannel : postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.thirdpartyAgentDemographicFlow-Processing'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], headers={id=9e342354-8436-e1de-774e-937c8b6809d5, timestamp=1537816582001}] for original GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]
Code/Integration流量
@Bean("sftpAgentInboundFlow")
public IntegrationFlow sftpAgentInboundFlow(SessionFactory<LsEntry> sftpSessionFactory) {
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory)
.deleteRemoteFiles(false)
.preserveTimestamp(true)
.remoteDirectory(agentRemoteDir)
.filter(new AcceptOnceFileListFilter<>())
.regexFilter(".*\.zip$")
.localDirectory(new File(inputDir))
.autoCreateLocalDirectory(true)
.maxFetchSize(1)
,
consumer -> consumer.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
.publishSubscribeChannel(pubSub -> pubSub
.id("AgentInboundDemographic-PubSub-Channel")
.subscribe(flow -> flow.bridge(e -> e.id("ziparchiver")).handle(agentDataArchiveChannelAdapter()))
.subscribe(surancebayAgentDemographicFlow())
)
.get();
}
//@Bean("surancebayAgentDemographicFlow")
public IntegrationFlow surancebayAgentDemographicFlow() {
return IntegrationFlows
//.from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)/*.maxMessagesPerPoll(corepoolsize)*/))
.from(MessageChannels.direct("thirdpartyAgentDemographicFlow-Processing"))
.transform(unZipTransformer())
.split(splitter())
.channel(MessageChannels.executor(taskExecutor()))
.<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
.aggregate(aggregator -> aggregator.groupTimeout(messageGroupWaiting).correlationStrategy(new CorrelationStrategy() {
@Override
public Object getCorrelationKey(Message<?> message) {
return "processdate";
}
}).sendPartialResultOnExpiry(true))
.handle("agentDemograpicOutput","generateAgentDemographicFile")
.channel(confirmChannel())
.get()
;
}
好的!我认为您使用某些 Spring 集成版本的问题尚未实现,其中使用外部 IntegrationFlow
作为子流的功能尚未实现。或者考虑升级到最新版本或使用变通方法 .subscribe("thirdpartyAgentDemographicFlow-Processing")
并取消注释 surancebayAgentDemographicFlow
定义中的 @Bean
注释。