Consumer/producer 问题:消耗缓慢暂停生产
Consumer/producer problem: pause production on slow consumption
我有一个 生产者 从磁盘读取文本块。多个 消费者 正在对这些块进行计算。
如果当前正在计算的块超过 n 个,我希望生产者暂停从磁盘读取数据。
已将其放入伪代码中以说明我想要实现的目标。
// "produceBlocks" reads blocks from disk one by one
// and feeds them to lambda
produceBlocks(block -> {
// (!) if activeCounter exceeds a THRESHOLD, then pause
executorService.submit(() -> {
activeCounter.incrementAndGet();
// do some work
activeCounter.decrementAndGet();
});
});
我会为您的线程池使用固定长度的队列,并在当前线程中 运行 实施 RejectedExecuptionHandler 或暂停并重试。
例如
我已经有效地使用了最后一个选项,一旦配置了 ExecutorService,它就不需要额外的代码。
"I would like producer to pause reading data from the disk if there are more that n blocks currently being computed over."
真正的任务描述略有不同:生产者在从磁盘读取数据之前,应该获得这样做的许可。
如果您的生产者是一个线程,那么管理许可的自然设施是 Semaphore。最初它包含 n 个许可证。生产者要读取一个块,需要 Semaphore::aquire
1 个许可。当块被消费者处理时,消费者释放 1 个许可 Semaphore::release
.
另一种方法是结合区块和许可。类似于从生产者到消费者的输出队列,为块创建一个输入阻塞队列。最初放在那里 n 块。生产者要读取一个块,首先从该队列中取出下一个块。消费者,处理一个块后,returns它到输入队列。
我有一个 生产者 从磁盘读取文本块。多个 消费者 正在对这些块进行计算。
如果当前正在计算的块超过 n 个,我希望生产者暂停从磁盘读取数据。
已将其放入伪代码中以说明我想要实现的目标。
// "produceBlocks" reads blocks from disk one by one
// and feeds them to lambda
produceBlocks(block -> {
// (!) if activeCounter exceeds a THRESHOLD, then pause
executorService.submit(() -> {
activeCounter.incrementAndGet();
// do some work
activeCounter.decrementAndGet();
});
});
我会为您的线程池使用固定长度的队列,并在当前线程中 运行 实施 RejectedExecuptionHandler 或暂停并重试。
例如
我已经有效地使用了最后一个选项,一旦配置了 ExecutorService,它就不需要额外的代码。
"I would like producer to pause reading data from the disk if there are more that n blocks currently being computed over."
真正的任务描述略有不同:生产者在从磁盘读取数据之前,应该获得这样做的许可。
如果您的生产者是一个线程,那么管理许可的自然设施是 Semaphore。最初它包含 n 个许可证。生产者要读取一个块,需要 Semaphore::aquire
1 个许可。当块被消费者处理时,消费者释放 1 个许可 Semaphore::release
.
另一种方法是结合区块和许可。类似于从生产者到消费者的输出队列,为块创建一个输入阻塞队列。最初放在那里 n 块。生产者要读取一个块,首先从该队列中取出下一个块。消费者,处理一个块后,returns它到输入队列。