在 Spring 集成 DSL 中使用带有丢弃通道的过滤器
Using filter with a discard channel in Spring Integration DSL
我不知道这个问题是关于 spring-integration、spring-integration-dsl 还是两者,所以我只是添加了 2 个标签...
我今天花了相当多的时间,先用过滤器做一个简单的流程
StandardIntegrationFlow flow = IntegrationFlows.from(...)
.filter(messagingFilter)
.transform(transformer)
.handle((m) -> {
(...)
})
.get();
messagingFilter 是 MessageSelector 的一个非常简单的实现。到目前为止一切顺利,没有花太多时间。但后来我想记录一条消息,以防 MessageSelector 返回 false,这就是我被卡住的地方。
经过一段时间后,我得到了这个:
StandardIntegrationFlow flow = IntegrationFlows.from(...)
.filter(messagingFilters, fs -> fs.discardFlow( i -> i.channel(discardChannel()))
.transform(transformer)
.handle((m) -> {
(...)
})
.get();
(...)
public MessageChannel discardChannel() {
MessageChannel channel = new MessageChannel(){
@Override
public boolean send(Message<?> message) {
log.warn((String) message.getPayload().get("msg-failure"));
return true;
}
@Override
public boolean send(Message<?> message, long timeout) {
return this.send(message);
}
};
return channel;
}
这既丑陋又冗长,所以问题是,我在这里做错了什么,我应该如何以更好、更干净、更优雅的解决方案来完成?
干杯。
你的问题是你没有看到 Filter
是一个 EI 模式实现,它最多只能将丢弃的消息发送到某个频道。它不会记录任何内容,因为该方法还不是基于消息传递的。
您的用例所需的最简单方法如下:
.discardFlow(df -> df
.handle(message -> log.warn((String) message.getPayload().get("msg-failure")))))
你的逻辑只是记录。其他一些人可能会做更复杂的逻辑。所以,最终你会习惯于端点之间的 channel 抽象。
我同意 new MessageChannel() {}
方法是错误的。日志记录确实应该在 MessageHandler
中完成。那就是服务责任的层次。也不要忘记有 LoggingHandler
,通过 Java DSL 可以实现为:
.filter(messagingFilters, fs -> fs.discardFlow( i -> i.log(message -> (String) message.getPayload().get("msg-failure"))))
我不知道这个问题是关于 spring-integration、spring-integration-dsl 还是两者,所以我只是添加了 2 个标签...
我今天花了相当多的时间,先用过滤器做一个简单的流程
StandardIntegrationFlow flow = IntegrationFlows.from(...)
.filter(messagingFilter)
.transform(transformer)
.handle((m) -> {
(...)
})
.get();
messagingFilter 是 MessageSelector 的一个非常简单的实现。到目前为止一切顺利,没有花太多时间。但后来我想记录一条消息,以防 MessageSelector 返回 false,这就是我被卡住的地方。
经过一段时间后,我得到了这个:
StandardIntegrationFlow flow = IntegrationFlows.from(...)
.filter(messagingFilters, fs -> fs.discardFlow( i -> i.channel(discardChannel()))
.transform(transformer)
.handle((m) -> {
(...)
})
.get();
(...)
public MessageChannel discardChannel() {
MessageChannel channel = new MessageChannel(){
@Override
public boolean send(Message<?> message) {
log.warn((String) message.getPayload().get("msg-failure"));
return true;
}
@Override
public boolean send(Message<?> message, long timeout) {
return this.send(message);
}
};
return channel;
}
这既丑陋又冗长,所以问题是,我在这里做错了什么,我应该如何以更好、更干净、更优雅的解决方案来完成?
干杯。
你的问题是你没有看到 Filter
是一个 EI 模式实现,它最多只能将丢弃的消息发送到某个频道。它不会记录任何内容,因为该方法还不是基于消息传递的。
您的用例所需的最简单方法如下:
.discardFlow(df -> df
.handle(message -> log.warn((String) message.getPayload().get("msg-failure")))))
你的逻辑只是记录。其他一些人可能会做更复杂的逻辑。所以,最终你会习惯于端点之间的 channel 抽象。
我同意 new MessageChannel() {}
方法是错误的。日志记录确实应该在 MessageHandler
中完成。那就是服务责任的层次。也不要忘记有 LoggingHandler
,通过 Java DSL 可以实现为:
.filter(messagingFilters, fs -> fs.discardFlow( i -> i.log(message -> (String) message.getPayload().get("msg-failure"))))