如何使用 Java DSL 配置带有 Poller 的 QueueChannel?
How can I configure a QueueChannel with Poller using Java DSL?
我们曾经在 int:chain 中配置轮询器,如下所示,inboundChannel 配置为 queue
<int:chain id="messageProcessChain" input-channel="inboundChannel" >
<int:poller fixed-delay="1" max-messages-per-poll="1"/>
这一次,我们想以编程方式初始化 int 流,因此使用 DSL。我的测试代码如下:
Jms.messageDrivenChannelAdapter(inConnFactory).destination(flowProperties.getInputQueue()))
.channel(MessageChannels.queue(mongoDbChannelMessageStore,"groupId"))
// .enrichHeaders(testHeaders)
.handle(m -> {
System.out.println("test handler");
logger.info("test dsl flow:"+m.getPayload().toString());
},c -> c.poller(Pollers.fixedRate(flowProperties.getInboudFixedRate()).maxMessagesPerPoll(10)))
.get()
它无法与 ".enrichHeaders(testHeaders)" 一起使用,因为“没有为端点定义轮询器”,但我也不能为 header 更丰富,因为它有一个隐含的 SubscribableChannel?
这样的话,只能用网桥连接两个通道吗?还有其他方法吗?
使用.enrichHeaders(headers, e -> e.poller(...))
.
但是,您不应使用带有 message-driven 通道适配器的队列通道,如果发生故障,您可能会丢失消息。
要实现并发,请增加适配器上的并发。
我们曾经在 int:chain 中配置轮询器,如下所示,inboundChannel 配置为 queue
<int:chain id="messageProcessChain" input-channel="inboundChannel" >
<int:poller fixed-delay="1" max-messages-per-poll="1"/>
这一次,我们想以编程方式初始化 int 流,因此使用 DSL。我的测试代码如下:
Jms.messageDrivenChannelAdapter(inConnFactory).destination(flowProperties.getInputQueue()))
.channel(MessageChannels.queue(mongoDbChannelMessageStore,"groupId"))
// .enrichHeaders(testHeaders)
.handle(m -> {
System.out.println("test handler");
logger.info("test dsl flow:"+m.getPayload().toString());
},c -> c.poller(Pollers.fixedRate(flowProperties.getInboudFixedRate()).maxMessagesPerPoll(10)))
.get()
它无法与 ".enrichHeaders(testHeaders)" 一起使用,因为“没有为端点定义轮询器”,但我也不能为 header 更丰富,因为它有一个隐含的 SubscribableChannel?
这样的话,只能用网桥连接两个通道吗?还有其他方法吗?
使用.enrichHeaders(headers, e -> e.poller(...))
.
但是,您不应使用带有 message-driven 通道适配器的队列通道,如果发生故障,您可能会丢失消息。
要实现并发,请增加适配器上的并发。