java 中的并发队列消耗之谜
Concurrent queue consumption enigma in java
尝试使用 Java OpenCV 加快我的图像处理速度,我尝试使用并行流来使用 OpenCV <Mat>
队列。如果我对算法计时并计算队列中剩余的内容,则在并行处理流时会得到不连贯的结果,但顺序计算结果是正确的。由于我使用了 ConcurrentLinkedQueue()
,我认为我在线程安全和异步性方面都做得很好,但显然不是。有谁知道如何绕过这个问题吗?
备注:
- 元素在其消费期间仍被放入队列
- 我是运行4实(8虚)核处理器
顺序流的结果:
frame collection start size (=production): 1455
frame collection end size (=production - consumption): 1360
resulting list size after algorithm run (=consumption): 100
algorithm: 6956 ms
并行流的结果:
frame collection start size (=production): 1455
frame collection end size (=production - consumption): 440
resulting list size after algorithm run (=consumption): 100
algorithm: 9242 ms
我的代码:
public class OvusculeTestConcurrent {
public final static ConcurrentLinkedQueue<Mat> frameCollection = new ConcurrentLinkedQueue<Mat>();
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
final String path = "C:\Users\Raoul\workspace\aorta2\ressource\artery_src_for_dual.avi";
long startAlgoTime = System.nanoTime();
// Constitute a frame collection in async mode
Capture cap = new Capture(path, frameCollection);
new Thread(cap).start();
Thread.sleep(3000); //leaves time to accumulate frames
System.out.println("frame collection start size (=production): "+frameCollection.size());
//Consumes the current queue in parallel/sequential
List<ImagePlus> lm = Stream.generate(() -> {
return frameCollection.poll();
})
.parallel() // comment to disable parallel computing
.limit(100L)
.map(img -> utils.PrepareImage(img,
new Point(300, 250),
new Point(450, 250),
new Point(400, 400),
0.25))
.collect(Collectors.toList());
//timing & printing the results
long endAlgoTime = System.nanoTime();
long algoDuration = (endAlgoTime - startAlgoTime)/1_000_000; //divide by 1_000_000 to get milliseconds.
System.out.println("frame collection end size (=production - consumption): "+frameCollection.size());
System.out.println("resulting list size after algorithm run (=consumption): "+lm.size());
System.out.println("algorithm: "+algoDuration+" ms");
System.exit(0);
}
}
代码中的一些事情引起了我的注意。
首先,使用 Stream.generate
创建流是正确的。最好只调用 queue.stream()
,这将 return 一个仅包含 队列当前内容 的流。你说元素在处理过程中被添加到队列中,所以那是行不通的。
一个问题是生成这样的流的代码(为清楚起见进行了编辑):
Stream.generate(() -> queue.poll())
问题出在poll
方法上,定义如下:
Retrieves and removes the head of this queue, or returns null if this queue is empty.
可能是当流 运行 并行时,流的线程可以比元素生成和插入队列更快地排空队列。如果发生这种情况,队列将在某个点清空并且流将填充 null
个元素 return poll
。
我不确定 PrepareImage
在传递 null 时做了什么,但它似乎将某些东西传递给了输出,这就是为什么你总是在目标列表中获得 100 个元素。
另一种方法是使用 BlockingQueue
实现并使用 take
方法,例如
Stream.generate(() -> queue.take())
这将避免向流中注入空值。我不确定您应该使用哪种 BlockingQueue
实现,但我建议您研究一个有大小限制的实现。如果您的生产者超过您的消费者,无界队列可能会扩展以填满所有可用内存。
不幸的是,BlockingQueue.take()
抛出 InterruptedException
所以你不能在简单的 lambda 中使用它。你必须弄清楚在中断时该怎么做。也许 return 一个虚拟元素或其他东西。
另一个问题是limit
方法对传递给下游的元素数量施加了限制,但在并行流中,多个线程可能会机会性地拉取超过流中要处理的元素数。这些由 limit
操作缓冲,直到达到其限制,此时流处理终止。从流源中提取并在达到限制时缓冲的任何元素都将被简单地丢弃。这可能就是为什么从队列中拉出超过 1,000 个元素,但最终只有 100 个元素出现在结果列表中的原因。
(但即使在连续的情况下,数字也不会相加。我不认为会发生同样的事情,当达到限制时缓冲元素被丢弃。也许是因为额外的加工过程中产生的元素?)
如果您可以忍受元素被丢弃,那么由 queue.take()
提供的并行流可能会起作用;否则,需要采用不同的方法。
我遇到过类似的问题,但在网络上找不到解决方案。想出我自己的 Spliterator
,看起来 SpliteratorWithUnknownSize 不支持并行性(没有研究,只是看到所有处理都在同一个线程中完成)。这是:
public class QueueDrainSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> elements;
public QueueDrainSpliterator(BlockingQueue<T> elements) {
this.elements = elements;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T el = elements.poll();
if (el != null) {
action.accept(el);
return true;
}
return false;
}
@Override
public Spliterator<T> trySplit() {
if (!elements.isEmpty()) {
BlockingQueue<T> split = new LinkedBlockingQueue<T>();
elements.drainTo(split, (int) Math.ceil(elements.size() / 2d));
return new QueueDrainSpliterator<T>(split);
}
return null;
}
@Override
public long estimateSize() {
return elements.size();
}
@Override
public int characteristics() {
return Spliterator.NONNULL | Spliterator.CONCURRENT;
}
}
用法示例:
StreamSupport.stream(new QueueDrainSpliterator<>(events), true)
.forEach(consumer);
当资源在队列中时,它将遍历资源。如果队列为空,它将完成。如果您需要等待生成元素,您可以尝试使用 elements.poll(long timeout, TimeUnit unit)
尝试使用 Java OpenCV 加快我的图像处理速度,我尝试使用并行流来使用 OpenCV <Mat>
队列。如果我对算法计时并计算队列中剩余的内容,则在并行处理流时会得到不连贯的结果,但顺序计算结果是正确的。由于我使用了 ConcurrentLinkedQueue()
,我认为我在线程安全和异步性方面都做得很好,但显然不是。有谁知道如何绕过这个问题吗?
备注:
- 元素在其消费期间仍被放入队列
- 我是运行4实(8虚)核处理器
顺序流的结果:
frame collection start size (=production): 1455
frame collection end size (=production - consumption): 1360
resulting list size after algorithm run (=consumption): 100
algorithm: 6956 ms
并行流的结果:
frame collection start size (=production): 1455
frame collection end size (=production - consumption): 440
resulting list size after algorithm run (=consumption): 100
algorithm: 9242 ms
我的代码:
public class OvusculeTestConcurrent {
public final static ConcurrentLinkedQueue<Mat> frameCollection = new ConcurrentLinkedQueue<Mat>();
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
final String path = "C:\Users\Raoul\workspace\aorta2\ressource\artery_src_for_dual.avi";
long startAlgoTime = System.nanoTime();
// Constitute a frame collection in async mode
Capture cap = new Capture(path, frameCollection);
new Thread(cap).start();
Thread.sleep(3000); //leaves time to accumulate frames
System.out.println("frame collection start size (=production): "+frameCollection.size());
//Consumes the current queue in parallel/sequential
List<ImagePlus> lm = Stream.generate(() -> {
return frameCollection.poll();
})
.parallel() // comment to disable parallel computing
.limit(100L)
.map(img -> utils.PrepareImage(img,
new Point(300, 250),
new Point(450, 250),
new Point(400, 400),
0.25))
.collect(Collectors.toList());
//timing & printing the results
long endAlgoTime = System.nanoTime();
long algoDuration = (endAlgoTime - startAlgoTime)/1_000_000; //divide by 1_000_000 to get milliseconds.
System.out.println("frame collection end size (=production - consumption): "+frameCollection.size());
System.out.println("resulting list size after algorithm run (=consumption): "+lm.size());
System.out.println("algorithm: "+algoDuration+" ms");
System.exit(0);
}
}
代码中的一些事情引起了我的注意。
首先,使用 Stream.generate
创建流是正确的。最好只调用 queue.stream()
,这将 return 一个仅包含 队列当前内容 的流。你说元素在处理过程中被添加到队列中,所以那是行不通的。
一个问题是生成这样的流的代码(为清楚起见进行了编辑):
Stream.generate(() -> queue.poll())
问题出在poll
方法上,定义如下:
Retrieves and removes the head of this queue, or returns null if this queue is empty.
可能是当流 运行 并行时,流的线程可以比元素生成和插入队列更快地排空队列。如果发生这种情况,队列将在某个点清空并且流将填充 null
个元素 return poll
。
我不确定 PrepareImage
在传递 null 时做了什么,但它似乎将某些东西传递给了输出,这就是为什么你总是在目标列表中获得 100 个元素。
另一种方法是使用 BlockingQueue
实现并使用 take
方法,例如
Stream.generate(() -> queue.take())
这将避免向流中注入空值。我不确定您应该使用哪种 BlockingQueue
实现,但我建议您研究一个有大小限制的实现。如果您的生产者超过您的消费者,无界队列可能会扩展以填满所有可用内存。
不幸的是,BlockingQueue.take()
抛出 InterruptedException
所以你不能在简单的 lambda 中使用它。你必须弄清楚在中断时该怎么做。也许 return 一个虚拟元素或其他东西。
另一个问题是limit
方法对传递给下游的元素数量施加了限制,但在并行流中,多个线程可能会机会性地拉取超过流中要处理的元素数。这些由 limit
操作缓冲,直到达到其限制,此时流处理终止。从流源中提取并在达到限制时缓冲的任何元素都将被简单地丢弃。这可能就是为什么从队列中拉出超过 1,000 个元素,但最终只有 100 个元素出现在结果列表中的原因。
(但即使在连续的情况下,数字也不会相加。我不认为会发生同样的事情,当达到限制时缓冲元素被丢弃。也许是因为额外的加工过程中产生的元素?)
如果您可以忍受元素被丢弃,那么由 queue.take()
提供的并行流可能会起作用;否则,需要采用不同的方法。
我遇到过类似的问题,但在网络上找不到解决方案。想出我自己的 Spliterator
,看起来 SpliteratorWithUnknownSize 不支持并行性(没有研究,只是看到所有处理都在同一个线程中完成)。这是:
public class QueueDrainSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> elements;
public QueueDrainSpliterator(BlockingQueue<T> elements) {
this.elements = elements;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T el = elements.poll();
if (el != null) {
action.accept(el);
return true;
}
return false;
}
@Override
public Spliterator<T> trySplit() {
if (!elements.isEmpty()) {
BlockingQueue<T> split = new LinkedBlockingQueue<T>();
elements.drainTo(split, (int) Math.ceil(elements.size() / 2d));
return new QueueDrainSpliterator<T>(split);
}
return null;
}
@Override
public long estimateSize() {
return elements.size();
}
@Override
public int characteristics() {
return Spliterator.NONNULL | Spliterator.CONCURRENT;
}
}
用法示例:
StreamSupport.stream(new QueueDrainSpliterator<>(events), true)
.forEach(consumer);
当资源在队列中时,它将遍历资源。如果队列为空,它将完成。如果您需要等待生成元素,您可以尝试使用 elements.poll(long timeout, TimeUnit unit)