Spring 集成 Java DSL 桥轮询两个不同的目录
Spring Integration Java DSL bridge to poll two different directories
我有安装文件 poller/channel 适配器,它使用 Java DSL 轮询目录和处理程序集成流。但是我没有得到任何参考如何将另一个 directory/channel 适配器和桥接添加到同一个处理程序。这是我的代码。
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File(incomingDir)).
filter(new SimplePatternFileListFilter("*.csv")).
filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(500).maxMessagesPerPoll(1))).
handle(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
Spring 整合中的首批 class 公民之一是 MessageChannel
实体。您始终可以在 IntegrationFlow
定义中的端点之间设置显式通道,并向它们显式发送消息。
对于“合并”用例,我建议在 .handle()
之前放置一个 .channel()
并为第二个目录声明第二个流,但在该流的末尾使用相同的.channel()
将此流中的消息“桥接”到第一个流的中间。
在参考手册中查看更多信息:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/java-dsl.html#java-dsl-channels
谢谢@Artem。跟随怎么样?
@Bean
public IntegrationFlow integrationFlowUi(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File(incomingDirUi)).
filter(new SimplePatternFileListFilter("*.csv")).
filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(500).maxMessagesPerPoll(1))).
channel("to-bridge").
handle(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
@Bean
public IntegrationFlow integrationFlowSftp(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File(incomingDirSftp)).
filter(new SimplePatternFileListFilter("*.csv")).
filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(500).maxMessagesPerPoll(1))).
channel("to-bridge").get();
}
我有安装文件 poller/channel 适配器,它使用 Java DSL 轮询目录和处理程序集成流。但是我没有得到任何参考如何将另一个 directory/channel 适配器和桥接添加到同一个处理程序。这是我的代码。
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File(incomingDir)).
filter(new SimplePatternFileListFilter("*.csv")).
filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(500).maxMessagesPerPoll(1))).
handle(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
Spring 整合中的首批 class 公民之一是 MessageChannel
实体。您始终可以在 IntegrationFlow
定义中的端点之间设置显式通道,并向它们显式发送消息。
对于“合并”用例,我建议在 .handle()
之前放置一个 .channel()
并为第二个目录声明第二个流,但在该流的末尾使用相同的.channel()
将此流中的消息“桥接”到第一个流的中间。
在参考手册中查看更多信息:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/java-dsl.html#java-dsl-channels
谢谢@Artem。跟随怎么样?
@Bean
public IntegrationFlow integrationFlowUi(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File(incomingDirUi)).
filter(new SimplePatternFileListFilter("*.csv")).
filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(500).maxMessagesPerPoll(1))).
channel("to-bridge").
handle(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
@Bean
public IntegrationFlow integrationFlowSftp(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File(incomingDirSftp)).
filter(new SimplePatternFileListFilter("*.csv")).
filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(500).maxMessagesPerPoll(1))).
channel("to-bridge").get();
}