Java(Scala) 阻塞队列是否允许队列跳线(更高优先级)?

Could Java(Scala) blocking queue allows queue jumper(higher priority)?

可以使用 LinkedBlockingQueue 来阻止操作。假设我在队列中只有 1 个元素,并且每个元素都可以使用它。

val q = new LinkedBlockingQueue() // 1 element in it
def fun() = {
  val instance = q.take()
  // do some operations
}
def foo() = {
  val instance = q.take()
  // do some operations
}
// Use 3 threads to run following 3 methods, and the order they call q.take() is following
fun()
fun() // will wait for first fun()
foo() // will wait for second fun()

这些方法完成的顺序是fun(), fun(), foo()

但是,现在我想将foo设置为更高的优先级,这意味着允许它成为队列跳线。 foo 可以在第二个 fun() 之前实例化(当第二个 fun 正在等待时,foo 跳到它的前面)

他们完成的顺序可以变成 fun(), foo(), fun(),(第一个 fun 会占用实例,因为实例可用,第二个应该等待,然后 foo 也等待, 但跳到了第二个 fun)

的前面

有可能吗?或者是否有任何其他可能的数据结构

您需要根据您的目的查找 CompletableFuture。这样就可以实现任务排序。

CompletableFuture<String> text = CompletableFuture.supplyAsync(() -> {
    return "";
}).thenApply(param -> {
    return "";
}).thenApply(param -> {
    return "";
});

我不知道有任何 built-in 工具可以完成这项任务,但实施起来并不难。由于您只想交换单个元素,因此不需要队列,而是交换器。

一个简单的实现可能看起来像

class SingleElementExchanger<T> {
    int priorityConsumer;
    T value;

    public synchronized void set(T newValue) throws InterruptedException {
        Objects.requireNonNull(newValue);
        while(value != null) wait();
        value = newValue;
        notifyAll();
    }

    public synchronized T ordinaryGet() throws InterruptedException {
        while(priorityConsumer != 0 || value == null) wait();
        T received = value;
        value = null;
        notifyAll();
        return received;
    }

    public synchronized T priorityGet() throws InterruptedException {
        priorityConsumer++;
        try {
            while(value == null) wait();
            T received = value;
            value = null;
            notifyAll();
            return received;
        }
        finally {
            priorityConsumer--;
        }
    }
}

对于您的两个普通消费者和一个优先消费者以及少量生产者,这可能已经足够了。

对于更多线程,您可能希望使用 Lock,而不是使用 notifyAll()

,以便能够通知正确的一方
class SingleElementExchanger<T> {
    final Lock lock = new ReentrantLock();
    final Condition empty = lock.newCondition(),
        fullNoPri = lock.newCondition(), fullPri = lock.newCondition();

    int priorityConsumer;
    T value;

    public void set(T newValue) throws InterruptedException {
        Objects.requireNonNull(newValue);
        lock.lock();
        try {
            while(value != null) empty.await();
            value = newValue;
            (priorityConsumer==0? fullNoPri: fullPri).signal();
        }
        finally {
            lock.unlock();
        }
    }

    public T ordinaryGet() throws InterruptedException {
        lock.lock();
        try {
            while(priorityConsumer != 0 || value == null) fullNoPri.await();
            T received = value;
            value = null;
            empty.signal();
            return received;
        }
        finally {
            lock.unlock();
        }
    }

    public T priorityGet() throws InterruptedException {
        lock.lock();
        try {
            priorityConsumer++;
            while(value == null) fullPri.await();
            T received = value;
            value = null;
            empty.signal();
            return received;
        }
        finally {
            priorityConsumer--;
            lock.unlock();
        }
    }
}