spring 与 java DSL 集成中的免费队列通道
Free queue channel in spring integration with java DSL
我有一个存储消息的频道。当新消息到达时,如果服务器尚未处理所有消息(仍在队列中),我需要清除队列(例如,通过将所有数据重新路由到另一个通道)。为此,我使用了路由器。但问题是当新消息到达时,不仅旧消息而且新消息都会重新路由到另一个通道。新消息必须保留在队列中。我怎么解决这个问题?
这是我的代码:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.bridge(e -> e.poller(Pollers.fixedDelay(500, TimeUnit.MILLISECONDS, 1000).maxMessagesPerPoll(1)))
.route(r -> {
if (flag) {
return "mainChannel";
} else {
return "garbageChannel";
}
})
.get();
}
@Bean
public IntegrationFlow outFlow() {
return IntegrationFlows.from("mainChannel")
.handle(m -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(m.getPayload() + "\tmainFlow");
})
.get();
}
@Bean
public IntegrationFlow outGarbage() {
return IntegrationFlows.from("garbageChannel")
.handle(m -> System.out.println(m.getPayload() + "\tgarbage"))
.get();
}
通过按“q”和“e”键通过@GateWay 更改标志值。
我建议您查看 QueueChannel
的 purge
API:
/**
* Remove any {@link Message Messages} that are not accepted by the provided selector.
* @param selector The message selector.
* @return The list of messages that were purged.
*/
List<Message<?>> purge(@Nullable MessageSelector selector);
通过自定义 MessageSelector
,您将能够从 queue 中删除旧消息。请参阅 timestamp
消息 header 进行咨询。使用此方法后,您可以对旧消息做任何需要做的事情。
我有一个存储消息的频道。当新消息到达时,如果服务器尚未处理所有消息(仍在队列中),我需要清除队列(例如,通过将所有数据重新路由到另一个通道)。为此,我使用了路由器。但问题是当新消息到达时,不仅旧消息而且新消息都会重新路由到另一个通道。新消息必须保留在队列中。我怎么解决这个问题? 这是我的代码:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.bridge(e -> e.poller(Pollers.fixedDelay(500, TimeUnit.MILLISECONDS, 1000).maxMessagesPerPoll(1)))
.route(r -> {
if (flag) {
return "mainChannel";
} else {
return "garbageChannel";
}
})
.get();
}
@Bean
public IntegrationFlow outFlow() {
return IntegrationFlows.from("mainChannel")
.handle(m -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(m.getPayload() + "\tmainFlow");
})
.get();
}
@Bean
public IntegrationFlow outGarbage() {
return IntegrationFlows.from("garbageChannel")
.handle(m -> System.out.println(m.getPayload() + "\tgarbage"))
.get();
}
通过按“q”和“e”键通过@GateWay 更改标志值。
我建议您查看 QueueChannel
的 purge
API:
/**
* Remove any {@link Message Messages} that are not accepted by the provided selector.
* @param selector The message selector.
* @return The list of messages that were purged.
*/
List<Message<?>> purge(@Nullable MessageSelector selector);
通过自定义 MessageSelector
,您将能够从 queue 中删除旧消息。请参阅 timestamp
消息 header 进行咨询。使用此方法后,您可以对旧消息做任何需要做的事情。