阻塞队列轮询方法在多个线程期间返回 null
Blocking Queue poll method retuns null during multiple threads
我们已经使用 blockingqueue、executor 框架和 future 在我们的应用程序中实现了多线程。
每当用户请求一些数据时,我们都会向连接到数据库的执行器框架提交任务,查询数据并将数据流回。
我们有一个方法,它读取这些数据并写入 servletoutputstrean。
public long writeData(ServletOutputStream sos, BlockingQueue < T > blockingQueue, Future < Boolean > future) {
try (final JsonWriter writer = new JsonWriter(new OutputStreamWriter(sos, UTF_8))) {
int counter = 0;
while (true) {
Optional.ofNullable(blockingqueue.poll()).ifPresent(entityobj - > {
gson.toJson(entityobj, entityobj.getclass(), writer);
Counter++;
});
if (blockingqueue.isEmpty() && future.isDone() && future.get()) {
if (count == 0) {
Log.error("data not read properly");
}
break;
}
}
} catch (Exception e) {
log.error(e);
}
return counter;
}
当blockingqueue.poll()被执行时,有时数据仍然没有被repositorythread加载。到下一个 if 块到来时,阻塞队列为空且 future 已完成,因此控制脱离了 while 循环。
没有响应写入流。无论如何要处理这种奇怪的行为
当有很多记录时,不会发生这种情况。
我认为 writeData
正在作为等待数据显示在 BlockingQueue
上然后“写入”数据的工作人员运行。您的问题是,有时将数据插入 BlockingQueue
的线程(此处未显示代码)需要一段时间才能从数据的来源加载数据,导致您的 writeData
工作人员认为它是完成并退出。
最好的解决方案是将一些“流结束”对象写入 BlockingQueue
以在没有更多数据时发出信号,而不是依赖于队列中没有数据。
您不知道 BlockingQueue<T>
中对象的确切类型(它只是 T
),因此有几种方法可以做到这一点。
第一种方法是让任何代码提供 T
也提供一个 T
的实例,它将代表流的结尾,您可以使用 equals
甚至检查相等运算符 ==
。这是否实用将取决于您拥有哪种 T
。
另一种选择是将其设为 BlockingQueue<Optional<T>>
并让另一个线程在没有更多数据时将 Optional.empty()
写入队列。
在任何一种情况下,您都可以使用超时值调用 poll()
并在超时时采取适当的操作(失败,检查插入队列的线程是否仍然存在,等等)。
When blockingqueue.poll() is being executed, there are times data is still not loaded by the repositorythread. By the time the next if block comes, blocking queue is empty and future is completed so control gets out of while loop.
您的代码中存在竞争条件,这让事情变得复杂:
if (blockingqueue.isEmpty() && /* race here */ future.isDone() && future.get()) {
有可能 blockingqueue.isEmpty()
returns 为真,然后作业在 future.isDone()
调用发生之前完成,导致代码过早退出并在队列中留下一个元素。
@Willis 提到的“流结束”对象是一个不错的选择,但一个简单的解决方案是重构您的代码,例如:
boolean futureDone = future.isDone();
entity = blockingqueue.poll();
if (entity == null) {
if (futureDone) {
break;
}
} else {
// process entity
}
这确保您始终检查未来是否完成在从阻塞队列中获取最后一项以消除竞争之前。您可能需要再做一次民意调查,但没关系。
顺便说一句,如果这段代码真的在旋转,你应该在 poll 中设置某种超时来减慢速度而不是吃 CPU:
entity = blockingqueue.poll(50, TimeUnit.MILLISECONDS);
我们已经使用 blockingqueue、executor 框架和 future 在我们的应用程序中实现了多线程。
每当用户请求一些数据时,我们都会向连接到数据库的执行器框架提交任务,查询数据并将数据流回。
我们有一个方法,它读取这些数据并写入 servletoutputstrean。
public long writeData(ServletOutputStream sos, BlockingQueue < T > blockingQueue, Future < Boolean > future) {
try (final JsonWriter writer = new JsonWriter(new OutputStreamWriter(sos, UTF_8))) {
int counter = 0;
while (true) {
Optional.ofNullable(blockingqueue.poll()).ifPresent(entityobj - > {
gson.toJson(entityobj, entityobj.getclass(), writer);
Counter++;
});
if (blockingqueue.isEmpty() && future.isDone() && future.get()) {
if (count == 0) {
Log.error("data not read properly");
}
break;
}
}
} catch (Exception e) {
log.error(e);
}
return counter;
}
当blockingqueue.poll()被执行时,有时数据仍然没有被repositorythread加载。到下一个 if 块到来时,阻塞队列为空且 future 已完成,因此控制脱离了 while 循环。
没有响应写入流。无论如何要处理这种奇怪的行为 当有很多记录时,不会发生这种情况。
我认为 writeData
正在作为等待数据显示在 BlockingQueue
上然后“写入”数据的工作人员运行。您的问题是,有时将数据插入 BlockingQueue
的线程(此处未显示代码)需要一段时间才能从数据的来源加载数据,导致您的 writeData
工作人员认为它是完成并退出。
最好的解决方案是将一些“流结束”对象写入 BlockingQueue
以在没有更多数据时发出信号,而不是依赖于队列中没有数据。
您不知道 BlockingQueue<T>
中对象的确切类型(它只是 T
),因此有几种方法可以做到这一点。
第一种方法是让任何代码提供 T
也提供一个 T
的实例,它将代表流的结尾,您可以使用 equals
甚至检查相等运算符 ==
。这是否实用将取决于您拥有哪种 T
。
另一种选择是将其设为 BlockingQueue<Optional<T>>
并让另一个线程在没有更多数据时将 Optional.empty()
写入队列。
在任何一种情况下,您都可以使用超时值调用 poll()
并在超时时采取适当的操作(失败,检查插入队列的线程是否仍然存在,等等)。
When blockingqueue.poll() is being executed, there are times data is still not loaded by the repositorythread. By the time the next if block comes, blocking queue is empty and future is completed so control gets out of while loop.
您的代码中存在竞争条件,这让事情变得复杂:
if (blockingqueue.isEmpty() && /* race here */ future.isDone() && future.get()) {
有可能 blockingqueue.isEmpty()
returns 为真,然后作业在 future.isDone()
调用发生之前完成,导致代码过早退出并在队列中留下一个元素。
@Willis 提到的“流结束”对象是一个不错的选择,但一个简单的解决方案是重构您的代码,例如:
boolean futureDone = future.isDone();
entity = blockingqueue.poll();
if (entity == null) {
if (futureDone) {
break;
}
} else {
// process entity
}
这确保您始终检查未来是否完成在从阻塞队列中获取最后一项以消除竞争之前。您可能需要再做一次民意调查,但没关系。
顺便说一句,如果这段代码真的在旋转,你应该在 poll 中设置某种超时来减慢速度而不是吃 CPU:
entity = blockingqueue.poll(50, TimeUnit.MILLISECONDS);