Spring Cloud Dataflow 将缓冲的消息发送到输出通道

Spring Cloud Dataflow send buffered messages to output channel

我有一个过滤器可以缓冲被拒绝的消息。一旦我的谓词得到满足,我想将这些缓冲消息发送到输出通道:

public class MyFilter implements MessageSelector {

    private Buffer buff;

    ...

    @Override
    public boolean accept(final Message<?> message) {
        if (isAcceptable(message))) {
            for (final Message<?> msg : buff.getBuffered()) {
                // accept these as well
            }
            return true;
        } else {
            buff.put(message);
            return false;
        }
    }
}

如何接受缓冲消息?有没有更好的方法?

我建议 aggregator 和自定义 ReleaseStrategy 是一个很好的方法。当释放策略释放组时,如果输出处理器(@Aggregator 方法以这种方式配置时)returns Message<?> 的集合,它们将作为单独的消息释放。

使用 Java 配置时,您可能会发现使用 Java DSL .aggregate(...) 函数比创建 @Bean 更简单。

目前有一个 pull request 聚合器应用启动器。