Spring 与 DSL 集成:文件出站通道适配器能否在间隔 10 分钟后创建文件
Spring Integration with DSL: Can File Outbound Channel Adapter create file after say 10 mins of interval
我有一个要求,我的应用程序应该从 MQ 读取消息并使用文件出站通道适配器写入。我希望我的每个输出文件都应该包含每 10 分钟间隔的消息。是否存在任何默认实现,或任何指向这样做的指针。
public @Bean IntegrationFlow defaultJmsFlow()
{
return IntegrationFlows.from(
//read JMS topic
Jms.messageDrivenChannelAdapter(this.connectionFactory).destination(this.config.getInputQueueName()).errorChannel(errorChannel()).configureListenerContainer(c ->
{
final DefaultMessageListenerContainer container = c.get();
container.setSessionTransacted(true);
container.setMaxMessagesPerTask(-1);
}).get())
.channel(messageProcessingChannel()).get();
}
public @Bean MessageChannel messageProcessingChannel()
{
return MessageChannels.queue().get();
}
public @Bean IntegrationFlow messageProcessingFlow() {
return IntegrationFlows.from(messageProcessingChannel())
.handle(Files.outboundAdapter(new File(config.getWorkingDir()))
.fileNameGenerator(fileNameGenerator())
.fileExistsMode(FileExistsMode.APPEND).appendNewLine(true))
.get();
}
首先,您可以在 FileWritingMessageHandler
和 fixedDelay
的端点上使用 QueueChannel
和 poller
等 10 分钟。但是,您应该记住,消息将在轮询器开始工作之前存储在内存中。因此,一旦您的应用程序崩溃,消息就会丢失。
另一方面,您可以使用具有类似 poller
配置的 JmsDestinationPollingSource
。但是,通过这种方式,您需要使用 maxMessagesPerPoll(-1)
对其进行配置,以让它在单个轮询任务期间从 MQ 中提取尽可能多的消息 - 每 10 分钟一次。
aggregator
及其 groupTimeout
选项可实现另一种变体。这样,在 10 分钟间隔过去之前,您不会收到来自聚合器的输出消息。然而,同样:默认情况下,商店在内存中。当我们已经有了 MQ 并且我们确实可以准确地轮询时,我不会再引入一个持久性存储来满足周期性需求。因此我会选择 JmsDestinationPollingSource
变体。
更新
Can you help me with how to set fixed delay in file outbound adapter.
由于您处理 QueueChannel
,您需要为 "fixed delay" 配置一个 PollingConsumer
端点。这个真的属于那个频道的订阅者。实际上它是一个 .handle(Files.outboundAdapter)
部分。只有您缺少的 Poller
是端点的选项,而不是 MessageHandler
。考虑使用重载的 handle()
变体:
.handle(Files.outboundAdapter(new File(config.getWorkingDir()))
.fileNameGenerator(fileNameGenerator())
.fileExistsMode(FileExistsMode.APPEND).appendNewLine(true),
e -> e.poller(p -> p.fixedDelay(10000)))
Or a sample example for JMSDestinationPollingSource
@Bean
public IntegrationFlow jmsInboundFlow() {
return IntegrationFlows
.from(Jms.inboundAdapter(cachingConnectionFactory())
.destination("jmsInbound"),
e -> e.poller(p -> p.fixedDelay(10000)))
.<String, String>transform(String::toUpperCase)
.channel(jmsOutboundInboundReplyChannel())
.get();
}
我有一个要求,我的应用程序应该从 MQ 读取消息并使用文件出站通道适配器写入。我希望我的每个输出文件都应该包含每 10 分钟间隔的消息。是否存在任何默认实现,或任何指向这样做的指针。
public @Bean IntegrationFlow defaultJmsFlow()
{
return IntegrationFlows.from(
//read JMS topic
Jms.messageDrivenChannelAdapter(this.connectionFactory).destination(this.config.getInputQueueName()).errorChannel(errorChannel()).configureListenerContainer(c ->
{
final DefaultMessageListenerContainer container = c.get();
container.setSessionTransacted(true);
container.setMaxMessagesPerTask(-1);
}).get())
.channel(messageProcessingChannel()).get();
}
public @Bean MessageChannel messageProcessingChannel()
{
return MessageChannels.queue().get();
}
public @Bean IntegrationFlow messageProcessingFlow() {
return IntegrationFlows.from(messageProcessingChannel())
.handle(Files.outboundAdapter(new File(config.getWorkingDir()))
.fileNameGenerator(fileNameGenerator())
.fileExistsMode(FileExistsMode.APPEND).appendNewLine(true))
.get();
}
首先,您可以在 FileWritingMessageHandler
和 fixedDelay
的端点上使用 QueueChannel
和 poller
等 10 分钟。但是,您应该记住,消息将在轮询器开始工作之前存储在内存中。因此,一旦您的应用程序崩溃,消息就会丢失。
另一方面,您可以使用具有类似 poller
配置的 JmsDestinationPollingSource
。但是,通过这种方式,您需要使用 maxMessagesPerPoll(-1)
对其进行配置,以让它在单个轮询任务期间从 MQ 中提取尽可能多的消息 - 每 10 分钟一次。
aggregator
及其 groupTimeout
选项可实现另一种变体。这样,在 10 分钟间隔过去之前,您不会收到来自聚合器的输出消息。然而,同样:默认情况下,商店在内存中。当我们已经有了 MQ 并且我们确实可以准确地轮询时,我不会再引入一个持久性存储来满足周期性需求。因此我会选择 JmsDestinationPollingSource
变体。
更新
Can you help me with how to set fixed delay in file outbound adapter.
由于您处理 QueueChannel
,您需要为 "fixed delay" 配置一个 PollingConsumer
端点。这个真的属于那个频道的订阅者。实际上它是一个 .handle(Files.outboundAdapter)
部分。只有您缺少的 Poller
是端点的选项,而不是 MessageHandler
。考虑使用重载的 handle()
变体:
.handle(Files.outboundAdapter(new File(config.getWorkingDir()))
.fileNameGenerator(fileNameGenerator())
.fileExistsMode(FileExistsMode.APPEND).appendNewLine(true),
e -> e.poller(p -> p.fixedDelay(10000)))
Or a sample example for
JMSDestinationPollingSource
@Bean
public IntegrationFlow jmsInboundFlow() {
return IntegrationFlows
.from(Jms.inboundAdapter(cachingConnectionFactory())
.destination("jmsInbound"),
e -> e.poller(p -> p.fixedDelay(10000)))
.<String, String>transform(String::toUpperCase)
.channel(jmsOutboundInboundReplyChannel())
.get();
}