使用多个线程从阻塞队列中读取
Reading from blocking queue with multiple threads
我有一个使用阻塞队列的生产者-消费者模型,其中 4 个线程从目录读取文件并将其放入阻塞队列,4 个线程(消费者)从阻塞队列读取。
我的问题是每次只有一个消费者从 Blockingqueue 中读取而其他 3 个消费者线程没有读取:
final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
CompletableFuture<Void> completableFutureProducer = produceUrls(files, queue, checker);
//not providing code for produceData , it is working file with all 4 //threads writing to Blocking queue. Here is the consumer code.
private CompletableFuture<Validator> consumeData(
final Response checker,
final CompletableFuture<Void> urls
) {
return CompletableFuture.supplyAsync(checker, 4)
.whenComplete((result, err) -> {
if (err != null) {
LOG.error("consuming url worker failed!", err);
urls.cancel(true);
}
});
}
completableFutureProducer.join();
completableFutureConsumer.join();
这是我的代码。有人可以告诉我我做错了什么吗?或帮助正确的代码。
为什么一个消费者正在从阻塞队列中读取。
为响应添加代码 class 从阻塞队列中读取:
@Slf4j
public final class Response implements Supplier<Check> {
private final BlockingQueue<byte[]> data;
private final AtomicBoolean producersComplete;
private final Calendar calendar = Calendar.getInstance();
public ResponseCode(
final BlockingQueue<byte[]> data
) {
this.data = data;
producersDone = new AtomicBoolean();
}
public void notifyProducersDone() {
producersComplete.set(true);
}
@Override
public Check get() {
try {
Check check = null;
try {
while (!data.isEmpty() || !producersDone.get()) {
final byte[] item = data.poll(1, TimeUnit.SECONDS);
if (item != null) {
LOG.info("{}",new String(item));
// I see only one thread printing result here .
validator = validateData(item);
}
}
} catch (InterruptedException | IOException e) {
Thread.currentThread().interrupt();
throw new WriteException("Exception occurred while data validation", e);
}
return check;
} finally {
LOG.info("Done reading data from BlockingQueue");
}
}
}
仅凭这一点很难诊断,但检查 data.isEmpty()
可能是不正确的,因为队列可能恰好暂时为空(但稍后会获取项目)。因此,您的线程可能会在遇到临时空队列时立即退出。
相反,如果生产者已完成并且您从 poll
得到空结果,您可以退出。这样线程只会在确实没有更多项目要处理时退出。
虽然您要返回最后一项(单独)的结果,但这有点奇怪。你确定这是你想要的吗?
编辑: 我最近做了一些非常相似的事情。 Here 是一个 class,它从文件中读取,以 multi-threaded 方式转换行,然后写入不同的文件(保留行的顺序)。
它还使用 BlockingQueue
。它与您的代码非常相似,但由于上述原因,它不会检查 quue.isEmpty()
。它对我来说很好用。
假设队列中有 1 个项目和 4 个消费者,其中一个将轮询项目呈现队列为空。然后其余 3 个消费者检查是否 queue.isEmpty()
,因为它是 - 退出循环。
4+4线程不算多,最好不要用CompletableFuture之类的异步工具。简单的多线程程序会更简单,运行速度更快。
有
BlockingQueue<byte[]> data;
不要使用 data.poll()
;
使用data.take();
我有一个使用阻塞队列的生产者-消费者模型,其中 4 个线程从目录读取文件并将其放入阻塞队列,4 个线程(消费者)从阻塞队列读取。
我的问题是每次只有一个消费者从 Blockingqueue 中读取而其他 3 个消费者线程没有读取:
final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
CompletableFuture<Void> completableFutureProducer = produceUrls(files, queue, checker);
//not providing code for produceData , it is working file with all 4 //threads writing to Blocking queue. Here is the consumer code.
private CompletableFuture<Validator> consumeData(
final Response checker,
final CompletableFuture<Void> urls
) {
return CompletableFuture.supplyAsync(checker, 4)
.whenComplete((result, err) -> {
if (err != null) {
LOG.error("consuming url worker failed!", err);
urls.cancel(true);
}
});
}
completableFutureProducer.join();
completableFutureConsumer.join();
这是我的代码。有人可以告诉我我做错了什么吗?或帮助正确的代码。 为什么一个消费者正在从阻塞队列中读取。
为响应添加代码 class 从阻塞队列中读取:
@Slf4j
public final class Response implements Supplier<Check> {
private final BlockingQueue<byte[]> data;
private final AtomicBoolean producersComplete;
private final Calendar calendar = Calendar.getInstance();
public ResponseCode(
final BlockingQueue<byte[]> data
) {
this.data = data;
producersDone = new AtomicBoolean();
}
public void notifyProducersDone() {
producersComplete.set(true);
}
@Override
public Check get() {
try {
Check check = null;
try {
while (!data.isEmpty() || !producersDone.get()) {
final byte[] item = data.poll(1, TimeUnit.SECONDS);
if (item != null) {
LOG.info("{}",new String(item));
// I see only one thread printing result here .
validator = validateData(item);
}
}
} catch (InterruptedException | IOException e) {
Thread.currentThread().interrupt();
throw new WriteException("Exception occurred while data validation", e);
}
return check;
} finally {
LOG.info("Done reading data from BlockingQueue");
}
}
}
仅凭这一点很难诊断,但检查 data.isEmpty()
可能是不正确的,因为队列可能恰好暂时为空(但稍后会获取项目)。因此,您的线程可能会在遇到临时空队列时立即退出。
相反,如果生产者已完成并且您从 poll
得到空结果,您可以退出。这样线程只会在确实没有更多项目要处理时退出。
虽然您要返回最后一项(单独)的结果,但这有点奇怪。你确定这是你想要的吗?
编辑: 我最近做了一些非常相似的事情。 Here 是一个 class,它从文件中读取,以 multi-threaded 方式转换行,然后写入不同的文件(保留行的顺序)。
它还使用 BlockingQueue
。它与您的代码非常相似,但由于上述原因,它不会检查 quue.isEmpty()
。它对我来说很好用。
假设队列中有 1 个项目和 4 个消费者,其中一个将轮询项目呈现队列为空。然后其余 3 个消费者检查是否 queue.isEmpty()
,因为它是 - 退出循环。
4+4线程不算多,最好不要用CompletableFuture之类的异步工具。简单的多线程程序会更简单,运行速度更快。
有
BlockingQueue<byte[]> data;
不要使用 data.poll()
;
使用data.take();