Java 8 中的 PriorityBlockingQueue 流出现故障

PriorityBlockingQueue stream in Java 8 is out of order

这两段代码的输出顺序不同。 第一篇:

while(!jobQueue.isEmpty()) {
    TimeoutJobRequest job = jobQueue.peek();
    if(job.isReady()) {
        execute(job);
        jobQueue.poll();
    } else {
        return;
    }
}

第二篇:

jobQueue.stream()
        .filter(TimeoutJobRequest::isReady)
        .peek(jobQueue::remove)
        .forEach(this::execute);

请注意 jobQueuePriorityBlockingQueue

仅当 this::execute 相对较长时(例如几秒钟)才会重新排序。

第一段代码不等于第二段,当job.isReady()函数returnsfalse时,第一段终止,但第二段仍然运行,stream的函数filter只是过滤操作

你可以把第一段代码改成

while(!jobQueue.isEmpty()) {
    TimeoutJobRequest job = jobQueue.peek();
    if(job.isReady()) {
        execute(job);
        jobQueue.poll();
    } 
}

PriorityBlockingQueuestream遵循Iterator顺序,根据documentation:

The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order.

如果你想要优先顺序,你需要poll来自PriorityBlockingQueue的元素。

PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
pq.add(5);
pq.add(8);
pq.add(3);

System.out.println("-- Try 1 --");
pq.stream().forEach(System.out::println);

System.out.println("-- Try 2 --");
IntStream.range(0, pq.size()).map(i -> pq.poll()).forEach(System.out::println);

输出(可能取决于 Java 实现):

-- Try 1 --
3
8
5
-- Try 2 --
3
5
8

如果您想创建一个遵循队列顺序的流,您可以尝试以下代码(它会清空队列):

Stream.generate(jobQueue::poll).limit(jobQueue.size())

不幸的是,迭代顺序!=优先顺序。

我准备了两个可复制粘贴的解决方案,用于使用 Stream API 使用优先顺序遍历 PriorityQueue

static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .limit(queue.size());
}

static <T> Stream<T> asStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    Comparator<? super T> comparator = queue.comparator();
    return comparator != null
      ? queue.stream().sorted(comparator)
      : queue.stream().sorted();
}

draintToStream清空队列,而asStream保持原始队列不变。