Java - 收集消费者?

Java - Collecting consumer?

有没有一种功能性的方法可以先收集流中的元素,然后立即将集合传递给消费者?换句话说,等待流结束,然后将终端操作作为集合而不是一个接一个地应用于所有流元素的构造?

例如,您能否将以下内容实现为单行代码:

Stream<Event> stream = // a stream of events
List<Event> list = stream.collect(Collectors.toList());
doProcessEvents(list);

作为一种解决方法,我可以(ab)使用 Collectors.collectingAndThen()Function 来实现我正在寻找的结果:

Function<List<Event>, Void> processingFunction = (list -> {
    doProcessEvents(list);
    return null;
});

Stream<Event> stream = // a stream of events
stream.collect(Collectors.collectingAndThen(Collectors.toList(), processingFunction);

我考虑过的其他替代方案(但行不通)是 collectingAndThen() 方法是否有一个 Consumer 作为第二个参数,例如Collectors.collectingAndThen(Collector downstream, Consumer consumer) 或如果 Consumer 接口有一个 finish() 方法,该方法在流中的最后一个元素被消耗后执行:

class EventConsumer implements Consumer<Event> {
    private final List<Event> list = new LinkedList<>();

    @Override
    public void accept(Event ev) {
        events.add(ev);
    }

    public void finish() {
        doProcessEvents(events);
    }
}

// usage
Stream<Event> stream = // a stream of events
stream.forEach(new EventConsumer());

这种方法的问题是事件将保存在内部列表中,但不会调用 finish() 方法。它稍作修改即可工作,但仍然没有单行:

Stream<Event> stream = // a stream of events
EventConsumer consumer = new EventConsumer()
stream.forEach(consumer);
consumer.finish();

Is there a functional way to first collect the elements of a stream and then immediately pass the collection to a consumer?

直接调用Consumer.accept(T t)方法即可:

Stream<Event> stream = ...
Consumer<List<Event>> consumer = ...

consumer.accept(stream.collect(Collectors.toList()));

最后,我决定采用不同的方法。我没有尝试将收集器和消费者硬塞在一起,而是创建了一个界面

interface StreamConsumer<T> {
   void consume(Stream<T> stream);
}

并将代码重构为


void processEvents(StreamConsumer<Events> streamConsumer, Stream<Event> events) {
    streamConsumer.consume(events);
}

这样做的目的是什么?现在,我可以实现不同类型的消费者,一些依赖 List<Event>,另一些可以消费 Stream<Event>,例如

class ListEventConsumer implements StreamConsumer<Event> {

    private ListProcessor<Event> listProcessor;

    ListEventConsumer(ListProcessor<Event> listProcessor) {
       this.listProcessor = listProcessor;
    }

    @Override
    void consume(Stream<Event> events) {
        List<Event> list = events.collect(Collectors.toList());
        listProcess.process(list);
    }
}

以及标准 Java Consumer 实现,例如

class FunctionalEventConsumer implements StreamConsumer<Event> {

    private Consumer<Event> consumer;

    FunctionalEventConsumer(Consumer<Event> consumer) {
        this.consumer = consumer;
    }

    @Override
    void consume(Stream<Event> events) {
        events.forEach(consumer);
    }
}

现在,我有一种方法既可以将 Event 作为 Stream<Event> 使用,也可以首先将它们收集到 ListEventConsumer class 中,然后将它们委托给 ListProcessor<Event>。使用这种方法,委托事件处理的 class 不需要知道使用了哪种 StreamConsumer<Event>

// either a ListEventConsumer or a FunctionalConsumer
StreamConsumer<Event> streamConsumer = // ...
Stream<Event> events = // ...

processEvents(streamConsumer, events);