在消息通道之间发送消息时不会发生延迟
No delay happening when sending messages between message channels
我是 Spring 集成 DSL 的新手。目前,我正在尝试添加延迟
在消息通道之间- "ordersChannel" 和 "bookItemsChannel"。但是,流程继续进行,就好像没有延迟一样。
任何帮助表示赞赏。
这是代码:
@Bean
public IntegrationFlow ordersFlow() {
return IntegrationFlows.from("ordersChannel")
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> message) {
return ((Order)message.getPayload()).getOrderItems();
}
})
.delay("normalMessage", new Consumer<DelayerEndpointSpec>() {
public void accept(DelayerEndpointSpec spec) {
spec.id("delayChannel");
spec.defaultDelay(50000000);
System.out.println("Going to delay");
}
})
.channel("bookItemsChannel")
.get();
}
在我看来,当您看到 System.out.println("Going to delay");
时,混合了 init
阶段和真正的运行时,当每个传入消息发生延迟时。
我们在 DSL 项目中有一些延迟测试用例,但我刚刚写了这个来证明 defaultDelay
工作正常:
@Bean
public IntegrationFlow ordersFlow() {
return f -> f
.split()
.delay("normalMessage", (DelayerEndpointSpec e) -> e.defaultDelay(5000))
.channel(c -> c.queue("bookItemsChannel"));
}
...
@Autowired
@Qualifier("ordersFlow.input")
private MessageChannel ordersFlowInput;
@Autowired
@Qualifier("bookItemsChannel")
private PollableChannel bookItemsChannel;
@Test
public void ordersDelayTests() {
this.ordersFlowInput.send(new GenericMessage<>(new String[] {"foo", "bar", "baz"}));
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Message<?> receive = this.bookItemsChannel.receive(10000);
assertNotNull(receive);
receive = this.bookItemsChannel.receive(10000);
assertNotNull(receive);
receive = this.bookItemsChannel.receive(10000);
assertNotNull(receive);
stopWatch.stop();
assertThat(stopWatch.getTotalTimeMillis(), greaterThanOrEqualTo(5000L));
}
如您所见,它与您的配置非常接近,但这并不能证明我们在 .delay()
.
附近有问题
所以,一个意想不到的问题,最好提供类似的东西来确认一下。
我是 Spring 集成 DSL 的新手。目前,我正在尝试添加延迟 在消息通道之间- "ordersChannel" 和 "bookItemsChannel"。但是,流程继续进行,就好像没有延迟一样。 任何帮助表示赞赏。 这是代码:
@Bean
public IntegrationFlow ordersFlow() {
return IntegrationFlows.from("ordersChannel")
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> message) {
return ((Order)message.getPayload()).getOrderItems();
}
})
.delay("normalMessage", new Consumer<DelayerEndpointSpec>() {
public void accept(DelayerEndpointSpec spec) {
spec.id("delayChannel");
spec.defaultDelay(50000000);
System.out.println("Going to delay");
}
})
.channel("bookItemsChannel")
.get();
}
在我看来,当您看到 System.out.println("Going to delay");
时,混合了 init
阶段和真正的运行时,当每个传入消息发生延迟时。
我们在 DSL 项目中有一些延迟测试用例,但我刚刚写了这个来证明 defaultDelay
工作正常:
@Bean
public IntegrationFlow ordersFlow() {
return f -> f
.split()
.delay("normalMessage", (DelayerEndpointSpec e) -> e.defaultDelay(5000))
.channel(c -> c.queue("bookItemsChannel"));
}
...
@Autowired
@Qualifier("ordersFlow.input")
private MessageChannel ordersFlowInput;
@Autowired
@Qualifier("bookItemsChannel")
private PollableChannel bookItemsChannel;
@Test
public void ordersDelayTests() {
this.ordersFlowInput.send(new GenericMessage<>(new String[] {"foo", "bar", "baz"}));
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Message<?> receive = this.bookItemsChannel.receive(10000);
assertNotNull(receive);
receive = this.bookItemsChannel.receive(10000);
assertNotNull(receive);
receive = this.bookItemsChannel.receive(10000);
assertNotNull(receive);
stopWatch.stop();
assertThat(stopWatch.getTotalTimeMillis(), greaterThanOrEqualTo(5000L));
}
如您所见,它与您的配置非常接近,但这并不能证明我们在 .delay()
.
所以,一个意想不到的问题,最好提供类似的东西来确认一下。