如何在 Spring Integration Java DSL 中自定义消息聚合逻辑
How to customize message aggregation logic in Spring Integration Java DSL
在集成流中,使用默认策略的拆分会从列表中发出一个项目。该项目的处理可能会失败。我想处理该错误并将包含前一条消息的映射信息(除了自定义错误 header)的新消息定向到正常的消息传递通道。
在聚合器中,我想自定义聚合逻辑以生成其他类型的消息,其中包含失败进程的计数和未失败消息的结果。
这里我解释一下我是如何用 header:
发送错误信息的
@Bean
public IntegrationFlow socialMediaErrorFlow() {
return IntegrationFlows.from("socialMediaErrorChannel")
.wireTap(sf -> sf.handle("errorService", "handleException"))
.<MessagingException>handle((p, h)
-> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList())
.copyHeaders(p.getFailedMessage().getHeaders())
.setHeader("ERROR", true)
.build()
)
.channel("directChannel_1")
.get();
}
我希望聚合器生成此类型的 object:
public class Result {
private Integer totalTask;
private Integer taskFailed;
private List<CommentEntity> comments;
}
我该如何处理?
提前致谢。
感谢 Artem 的帮助,我完成了这个实现:
.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() {
@Override
public Object processMessageGroup(MessageGroup mg) {
Integer failedTaskCount = 0;
Integer totalTaskCount = mg.getMessages().size();
List<CommentEntity> comments = new ArrayList<>();
for(Message<?> message: mg.getMessages()){
if(message.getHeaders().containsKey("ERROR"))
failedTaskCount++;
else
comments.addAll((List<CommentEntity>)message.getPayload());
}
return new IterationResult(totalTaskCount, failedTaskCount, comments);
}
}))
AggregatorSpec
有outputProcessor
属性:
/**
* A processor to determine the output message from the released group. Defaults to a message
* with a payload that is a collection of payloads from the input messages.
* @param outputProcessor the processor.
* @return the aggregator spec.
*/
public AggregatorSpec outputProcessor(MessageGroupProcessor outputProcessor) {
在这里您可以提供自己的自定义逻辑来解析群组中的所有消息并为它们构建您的 Result
。
来自测试用例的样本:
.aggregate(a -> a.outputProcessor(g -> g.getMessages()
.stream()
.map(m -> (String) m.getPayload())
.collect(Collectors.joining(" "))))
Cafe Demo 样本:
.aggregate(aggregator -> aggregator
.outputProcessor(g ->
new Delivery(g.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList())))
.correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber()))
在集成流中,使用默认策略的拆分会从列表中发出一个项目。该项目的处理可能会失败。我想处理该错误并将包含前一条消息的映射信息(除了自定义错误 header)的新消息定向到正常的消息传递通道。
在聚合器中,我想自定义聚合逻辑以生成其他类型的消息,其中包含失败进程的计数和未失败消息的结果。
这里我解释一下我是如何用 header:
发送错误信息的@Bean
public IntegrationFlow socialMediaErrorFlow() {
return IntegrationFlows.from("socialMediaErrorChannel")
.wireTap(sf -> sf.handle("errorService", "handleException"))
.<MessagingException>handle((p, h)
-> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList())
.copyHeaders(p.getFailedMessage().getHeaders())
.setHeader("ERROR", true)
.build()
)
.channel("directChannel_1")
.get();
}
我希望聚合器生成此类型的 object:
public class Result {
private Integer totalTask;
private Integer taskFailed;
private List<CommentEntity> comments;
}
我该如何处理?
提前致谢。
感谢 Artem 的帮助,我完成了这个实现:
.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() {
@Override
public Object processMessageGroup(MessageGroup mg) {
Integer failedTaskCount = 0;
Integer totalTaskCount = mg.getMessages().size();
List<CommentEntity> comments = new ArrayList<>();
for(Message<?> message: mg.getMessages()){
if(message.getHeaders().containsKey("ERROR"))
failedTaskCount++;
else
comments.addAll((List<CommentEntity>)message.getPayload());
}
return new IterationResult(totalTaskCount, failedTaskCount, comments);
}
}))
AggregatorSpec
有outputProcessor
属性:
/**
* A processor to determine the output message from the released group. Defaults to a message
* with a payload that is a collection of payloads from the input messages.
* @param outputProcessor the processor.
* @return the aggregator spec.
*/
public AggregatorSpec outputProcessor(MessageGroupProcessor outputProcessor) {
在这里您可以提供自己的自定义逻辑来解析群组中的所有消息并为它们构建您的 Result
。
来自测试用例的样本:
.aggregate(a -> a.outputProcessor(g -> g.getMessages()
.stream()
.map(m -> (String) m.getPayload())
.collect(Collectors.joining(" "))))
Cafe Demo 样本:
.aggregate(aggregator -> aggregator
.outputProcessor(g ->
new Delivery(g.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList())))
.correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber()))