spring-按分类器整合组消息
spring-integration group messages by classifier
有了 Java 8 个流,我可以按分类器对消息进行分组:
Map<String, List<String>> grouped = Arrays.asList("a", "b", "b", "b", "c")
.stream()
.collect(Collectors.groupingBy(Function.identity()));
我想写一个聚合器来对消息进行相应的分组。在上面显示的带有有效负载的五条消息中,我想生成三条消息:
第一个应该有 "a"
作为有效载荷,第二个应该有一个包含三个 "b"
的列表作为有效载荷,第三个应该有 "c"
作为有效载荷。
在达到序列大小时应释放所有消息组。基于有效负载的分组工作正常,但消息组永远不会被释放。
在发布策略中我可以访问序列大小,但我无法找出已处理的项目总数。如何发布我的分组消息?
public interface StringGrouper {
List<Message<?> groupSame(List<String> toGroup);
}
@Bean
public IntegrationFlow groupStringsFlow() {
return IntegrationFlows.from(StringGrouper.class)
.split()
.aggregate(agg -> agg
.correlationStrategy(message -> message.getPayload())
.releaseStrategy(group -> group.getSequenceSize() == /* what? */))
.logAndReply();
}
@Test
public void shouldGroupMessages {
List<Message<?> grouper
.groupSame(Arrays.asList("a", "b", "b", "b", "c"));
}
解决方法是根本不使用聚合器,而是将传入列表分组到转换器中。但我希望我可以为此使用聚合器。
@Bean
public IntegrationFlow groupStringsFlow() {
return IntegrationFlows.from(StringGrouper.class)
.<List<String>, Collection<List<String>>>transform(source -> source.stream()
.collect(Collectors.collectingAndThen(
Collectors.groupingBy(Function.identity()),
grouped -> grouped.values())))
.split()
.log() // work with messages
.aggregate()
.get();
}
使用默认序列大小释放策略将它们聚合为一个组,并使用自定义输出处理器 (MessageGroupProcessor
) 根据有效负载重新分组,返回 Collection<Message<List<?>>>
.
有了 Java 8 个流,我可以按分类器对消息进行分组:
Map<String, List<String>> grouped = Arrays.asList("a", "b", "b", "b", "c")
.stream()
.collect(Collectors.groupingBy(Function.identity()));
我想写一个聚合器来对消息进行相应的分组。在上面显示的带有有效负载的五条消息中,我想生成三条消息:
第一个应该有 "a"
作为有效载荷,第二个应该有一个包含三个 "b"
的列表作为有效载荷,第三个应该有 "c"
作为有效载荷。
在达到序列大小时应释放所有消息组。基于有效负载的分组工作正常,但消息组永远不会被释放。
在发布策略中我可以访问序列大小,但我无法找出已处理的项目总数。如何发布我的分组消息?
public interface StringGrouper {
List<Message<?> groupSame(List<String> toGroup);
}
@Bean
public IntegrationFlow groupStringsFlow() {
return IntegrationFlows.from(StringGrouper.class)
.split()
.aggregate(agg -> agg
.correlationStrategy(message -> message.getPayload())
.releaseStrategy(group -> group.getSequenceSize() == /* what? */))
.logAndReply();
}
@Test
public void shouldGroupMessages {
List<Message<?> grouper
.groupSame(Arrays.asList("a", "b", "b", "b", "c"));
}
解决方法是根本不使用聚合器,而是将传入列表分组到转换器中。但我希望我可以为此使用聚合器。
@Bean
public IntegrationFlow groupStringsFlow() {
return IntegrationFlows.from(StringGrouper.class)
.<List<String>, Collection<List<String>>>transform(source -> source.stream()
.collect(Collectors.collectingAndThen(
Collectors.groupingBy(Function.identity()),
grouped -> grouped.values())))
.split()
.log() // work with messages
.aggregate()
.get();
}
使用默认序列大小释放策略将它们聚合为一个组,并使用自定义输出处理器 (MessageGroupProcessor
) 根据有效负载重新分组,返回 Collection<Message<List<?>>>
.