Consumer/producer 问题:消耗缓慢暂停生产

Consumer/producer problem: pause production on slow consumption

我有一个 生产者 从磁盘读取文本块。多个 消费者 正在对这些块进行计算。

如果当前正在计算的块超过 n 个,我希望生产者暂停从磁盘读取数据。

已将其放入伪代码中以说明我想要实现的目标。

// "produceBlocks" reads blocks from disk one by one
// and feeds them to lambda
produceBlocks(block -> {
  // (!) if activeCounter exceeds a THRESHOLD, then pause

  executorService.submit(() -> { 
     activeCounter.incrementAndGet();

     // do some work

     activeCounter.decrementAndGet();
  });
});

我会为您的线程池使用固定长度的队列,并在当前线程中 运行 实施 RejectedExecuptionHandler 或暂停并重试。

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/RejectedExecutionHandler.html#rejectedExecution(java.lang.Runnable,%20java.util.concurrent.ThreadPoolExecutor)

例如

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

我已经有效地使用了最后一个选项,一旦配置了 ExecutorService,它就不需要额外的代码。

"I would like producer to pause reading data from the disk if there are more that n blocks currently being computed over." 真正的任务描述略有不同:生产者在从磁盘读取数据之前,应该获得这样做的许可。 如果您的生产者是一个线程,那么管理许可的自然设施是 Semaphore。最初它包含 n 个许可证。生产者要读取一个块,需要 Semaphore::aquire 1 个许可。当块被消费者处理时,消费者释放 1 个许可 Semaphore::release.

另一种方法是结合区块和许可。类似于从生产者到消费者的输出队列,为块创建一个输入阻塞队列。最初放在那里 n 块。生产者要读取一个块,首先从该队列中取出下一个块。消费者,处理一个块后,returns它到输入队列。