Java 8 中的 PriorityBlockingQueue 流出现故障
PriorityBlockingQueue stream in Java 8 is out of order
这两段代码的输出顺序不同。
第一篇:
while(!jobQueue.isEmpty()) {
TimeoutJobRequest job = jobQueue.peek();
if(job.isReady()) {
execute(job);
jobQueue.poll();
} else {
return;
}
}
第二篇:
jobQueue.stream()
.filter(TimeoutJobRequest::isReady)
.peek(jobQueue::remove)
.forEach(this::execute);
请注意 jobQueue
是 PriorityBlockingQueue
。
仅当 this::execute
相对较长时(例如几秒钟)才会重新排序。
第一段代码不等于第二段,当job.isReady()
函数returnsfalse
时,第一段终止,但第二段仍然运行,stream的函数filter
只是过滤操作
你可以把第一段代码改成
while(!jobQueue.isEmpty()) {
TimeoutJobRequest job = jobQueue.peek();
if(job.isReady()) {
execute(job);
jobQueue.poll();
}
}
PriorityBlockingQueue
的stream
遵循Iterator
顺序,根据documentation:
The Iterator provided in method iterator() is not guaranteed to
traverse the elements of the PriorityBlockingQueue in any particular
order.
如果你想要优先顺序,你需要poll
来自PriorityBlockingQueue
的元素。
PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
pq.add(5);
pq.add(8);
pq.add(3);
System.out.println("-- Try 1 --");
pq.stream().forEach(System.out::println);
System.out.println("-- Try 2 --");
IntStream.range(0, pq.size()).map(i -> pq.poll()).forEach(System.out::println);
输出(可能取决于 Java 实现):
-- Try 1 --
3
8
5
-- Try 2 --
3
5
8
如果您想创建一个遵循队列顺序的流,您可以尝试以下代码(它会清空队列):
Stream.generate(jobQueue::poll).limit(jobQueue.size())
不幸的是,迭代顺序!=优先顺序。
我准备了两个可复制粘贴的解决方案,用于使用 Stream API 使用优先顺序遍历 PriorityQueue
:
static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
Objects.requireNonNull(queue);
return Stream.generate(queue::poll)
.limit(queue.size());
}
static <T> Stream<T> asStream(PriorityQueue<T> queue) {
Objects.requireNonNull(queue);
Comparator<? super T> comparator = queue.comparator();
return comparator != null
? queue.stream().sorted(comparator)
: queue.stream().sorted();
}
draintToStream
清空队列,而asStream
保持原始队列不变。
这两段代码的输出顺序不同。 第一篇:
while(!jobQueue.isEmpty()) {
TimeoutJobRequest job = jobQueue.peek();
if(job.isReady()) {
execute(job);
jobQueue.poll();
} else {
return;
}
}
第二篇:
jobQueue.stream()
.filter(TimeoutJobRequest::isReady)
.peek(jobQueue::remove)
.forEach(this::execute);
请注意 jobQueue
是 PriorityBlockingQueue
。
仅当 this::execute
相对较长时(例如几秒钟)才会重新排序。
第一段代码不等于第二段,当job.isReady()
函数returnsfalse
时,第一段终止,但第二段仍然运行,stream的函数filter
只是过滤操作
你可以把第一段代码改成
while(!jobQueue.isEmpty()) {
TimeoutJobRequest job = jobQueue.peek();
if(job.isReady()) {
execute(job);
jobQueue.poll();
}
}
PriorityBlockingQueue
的stream
遵循Iterator
顺序,根据documentation:
The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order.
如果你想要优先顺序,你需要poll
来自PriorityBlockingQueue
的元素。
PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
pq.add(5);
pq.add(8);
pq.add(3);
System.out.println("-- Try 1 --");
pq.stream().forEach(System.out::println);
System.out.println("-- Try 2 --");
IntStream.range(0, pq.size()).map(i -> pq.poll()).forEach(System.out::println);
输出(可能取决于 Java 实现):
-- Try 1 --
3
8
5
-- Try 2 --
3
5
8
如果您想创建一个遵循队列顺序的流,您可以尝试以下代码(它会清空队列):
Stream.generate(jobQueue::poll).limit(jobQueue.size())
不幸的是,迭代顺序!=优先顺序。
我准备了两个可复制粘贴的解决方案,用于使用 Stream API 使用优先顺序遍历 PriorityQueue
:
static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
Objects.requireNonNull(queue);
return Stream.generate(queue::poll)
.limit(queue.size());
}
static <T> Stream<T> asStream(PriorityQueue<T> queue) {
Objects.requireNonNull(queue);
Comparator<? super T> comparator = queue.comparator();
return comparator != null
? queue.stream().sorted(comparator)
: queue.stream().sorted();
}
draintToStream
清空队列,而asStream
保持原始队列不变。