一个上游流供给多个下游流

One upstream stream feeding multiple downstream streams

我有一个一般的 Streams API 问题想要解决 "efficiently"。假设我有一个(可能非常大,可能是无限的)流。我想以某种方式对其进行预处理,例如,过滤掉一些项目,并对一些项目进行变异。假设这种预处理很复杂、时间和计算密集型,所以我不想做两次。

接下来我想对项目序列执行两组不同的操作,并使用不同的流类型构造处理每个不同序列的远端。对于无限流,这将是一个 forEach,对于有限流,它可能是一个收集器或其他任何东西。

显然,我可能会将中间结果收集到一个列表中,然后将两个单独的流拖出该列表,分别处理每个流。这适用于有限流,尽管 a) 看起来 "ugly" 和 b) 对于非常大的流可能不切实际,而且完全不适用于无限流。

我想我可以使用 peek 作为一种 "tee"。然后我可以对 peek 下游的结果执行一个处理链,并以某种方式强迫 peek 中的消费者执行 "other" 工作,但现在第二条路径不再是流。

我发现我可以创建一个 BlockingQueue,使用 peek 将东西推入该队列,然后从队列中获取一个流。这似乎是个好主意,实际上效果很好,尽管我不明白流是如何关闭的(它确实关闭了,但我看不到如何关闭)。下面是说明这一点的示例代码:

List<Student> ls = Arrays.asList(
  new Student("Fred", 2.3F)
  // more students (and Student definition) elided ...
);

BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();

ls.stream()
  .peek(s -> {
     try {
       pipe.put(s);
     } catch (InterruptedException ioe) {
       ioe.printStackTrace();
     }
   })
   .forEach(System.out::println);

   new Thread(
     new Runnable() {
       public void run() {
         Map<String, Double> map = 
           pipe.stream()
             .collect(Collectors.groupingBy(s->s.getName(),
                      Collectors.averagingDouble(s->s.getGpa())));
         map.forEach(
           (k,v)->
             System.out.println(
               "Students called " + k 
               + " average " + v));

       }
     }).start();

因此,第一个问题是:是否有 "better" 方法来做到这一点?

第二个问题,BlockingQueue 上的流到底是怎么关闭的?

干杯, 托比

有趣的问题。我先回答第二个问题,因为这个问题比较简单

Second question, how the heck is that stream on the BlockingQueue getting closed?

"closed" 我想你的意思是,流有一定数量的元素然后结束,忽略将来可能添加到队列中的任何元素。原因是队列上的流仅表示创建流时队列的 当前内容。它不代表任何未来元素,即其他线程将来可能添加的元素。

如果您想要一个表示队列当前和未来内容的流,那么您可以使用本文 other answer 中描述的技术。基本上用Stream.generate()调用queue.take()。不过我不认为这是你想要做的,所以我不会在这里进一步讨论它。

现在谈谈你的大问题。

您有一个对象源,您希望对其进行一些处理,包括过滤。然后您想要获取结果并将它们发送到两个不同的下游处理步骤。本质上你有一个生产者和两个消费者。

您必须处理的基本问题之一是如何处理不同处理步骤以不同速率发生的情况。假设我们已经解决了如何从队列中获取流而不会使流过早终止的问题。如果生产者生产元素的速度快于消费者处理此队列中元素的速度,则队列将累积元素,直到填满所有可用内存。

您还必须决定如何以不同的速率处理不同的消费者处理元素。如果一个消费者比另一个消费者慢得多,则可能需要缓冲任意数量的元素(这可能会填满内存),或者必须减慢较快的消费者的速度以匹配较慢的消费者的平均速率。

让我勾勒出您可能如何进行的草图。不过,我不知道您的实际要求,所以我不知道这是否令人满意。需要注意的一件事是,在这种应用程序中使用并行流可能会出现问题,因为并行流不能很好地处理阻塞和负载平衡。

首先,我将从生产者的流处理元素开始并将它们累积到 ArrayBlockingQueue:

BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity);
producer.map(...)
        .filter(...)
        .forEach(queue::put);

(注意put会抛出InterruptedException,所以你不能只把queue::put放在这里,你必须在这里放一个try-catch块,或者写一个辅助方法相反。但是如果 InterruptedException 被抓到该怎么办并不明显。)

如果队列填满,这将阻塞管道。 运行 这在它自己的线程中顺序进行,或者如果是并行的,在专用线程池中,以避免阻塞公共池。

接下来,消费者:

while (true) {
    // wait until the queue is full, or a timeout has expired,
    // depending upon how frequently you want to continue
    // processing elements emitted by the producer
    List<T> list = new ArrayList<>();
    queue.drainTo(list);
    downstream1 = list.stream().filter(...).map(...).collect(...);
    downstream2 = list.stream().filter(...).map(...).collect(...);
    // deal with results downstream1 and downstream2
}

这里的关键是从生产者到消费者的切换是使用 drainTo 方法分批完成的,该方法将队列的元素添加到目的地并自动清空队列。这样,消费者就不必等待生产者完成其处理(如果它是无限的,则不会发生)。此外,消费者正在对已知数量的数据进行操作,并且不会在处理过程中阻塞。因此,如果有帮助,每个消费者流都可以 运行 并行。

在这里,我让消费者​​ 运行 步调一致。如果您希望消费者以不同的速率 运行,则必须构建额外的队列(或其他队列)来独立缓冲他们的工作负载。

如果消费者总体上比生产者慢,队列最终会填满并被阻塞,从而使生产者的速度减慢到消费者可以接受的速度。如果消费者平均比生产者快,那么也许你不需要担心消费者的相对处理速度。您可以让它们循环并获取生产者设法放入队列中的任何内容,或者甚至让它们阻塞直到有可用的东西。

我应该说我所概述的是一种非常简单的多级流水线方法。如果您的应用程序对性能至关重要,您可能会发现自己做了大量工作来调整内存消耗、负载平衡、增加吞吐量和减少延迟。还有其他框架可能更适合您的应用程序。例如,您可能会看一下 LMAX Disruptor,尽管我自己没有任何经验。