Java(Scala) 阻塞队列是否允许队列跳线(更高优先级)?
Could Java(Scala) blocking queue allows queue jumper(higher priority)?
可以使用 LinkedBlockingQueue
来阻止操作。假设我在队列中只有 1 个元素,并且每个元素都可以使用它。
val q = new LinkedBlockingQueue() // 1 element in it
def fun() = {
val instance = q.take()
// do some operations
}
def foo() = {
val instance = q.take()
// do some operations
}
// Use 3 threads to run following 3 methods, and the order they call q.take() is following
fun()
fun() // will wait for first fun()
foo() // will wait for second fun()
这些方法完成的顺序是fun(), fun(), foo()
但是,现在我想将foo
设置为更高的优先级,这意味着允许它成为队列跳线。 foo
可以在第二个 fun()
之前实例化(当第二个 fun
正在等待时,foo
跳到它的前面)
他们完成的顺序可以变成 fun(), foo(), fun()
,(第一个 fun
会占用实例,因为实例可用,第二个应该等待,然后 foo
也等待, 但跳到了第二个 fun
)
的前面
有可能吗?或者是否有任何其他可能的数据结构
您需要根据您的目的查找 CompletableFuture。这样就可以实现任务排序。
CompletableFuture<String> text = CompletableFuture.supplyAsync(() -> {
return "";
}).thenApply(param -> {
return "";
}).thenApply(param -> {
return "";
});
我不知道有任何 built-in 工具可以完成这项任务,但实施起来并不难。由于您只想交换单个元素,因此不需要队列,而是交换器。
一个简单的实现可能看起来像
class SingleElementExchanger<T> {
int priorityConsumer;
T value;
public synchronized void set(T newValue) throws InterruptedException {
Objects.requireNonNull(newValue);
while(value != null) wait();
value = newValue;
notifyAll();
}
public synchronized T ordinaryGet() throws InterruptedException {
while(priorityConsumer != 0 || value == null) wait();
T received = value;
value = null;
notifyAll();
return received;
}
public synchronized T priorityGet() throws InterruptedException {
priorityConsumer++;
try {
while(value == null) wait();
T received = value;
value = null;
notifyAll();
return received;
}
finally {
priorityConsumer--;
}
}
}
对于您的两个普通消费者和一个优先消费者以及少量生产者,这可能已经足够了。
对于更多线程,您可能希望使用 Lock
,而不是使用 notifyAll()
。
,以便能够通知正确的一方
class SingleElementExchanger<T> {
final Lock lock = new ReentrantLock();
final Condition empty = lock.newCondition(),
fullNoPri = lock.newCondition(), fullPri = lock.newCondition();
int priorityConsumer;
T value;
public void set(T newValue) throws InterruptedException {
Objects.requireNonNull(newValue);
lock.lock();
try {
while(value != null) empty.await();
value = newValue;
(priorityConsumer==0? fullNoPri: fullPri).signal();
}
finally {
lock.unlock();
}
}
public T ordinaryGet() throws InterruptedException {
lock.lock();
try {
while(priorityConsumer != 0 || value == null) fullNoPri.await();
T received = value;
value = null;
empty.signal();
return received;
}
finally {
lock.unlock();
}
}
public T priorityGet() throws InterruptedException {
lock.lock();
try {
priorityConsumer++;
while(value == null) fullPri.await();
T received = value;
value = null;
empty.signal();
return received;
}
finally {
priorityConsumer--;
lock.unlock();
}
}
}
可以使用 LinkedBlockingQueue
来阻止操作。假设我在队列中只有 1 个元素,并且每个元素都可以使用它。
val q = new LinkedBlockingQueue() // 1 element in it
def fun() = {
val instance = q.take()
// do some operations
}
def foo() = {
val instance = q.take()
// do some operations
}
// Use 3 threads to run following 3 methods, and the order they call q.take() is following
fun()
fun() // will wait for first fun()
foo() // will wait for second fun()
这些方法完成的顺序是fun(), fun(), foo()
但是,现在我想将foo
设置为更高的优先级,这意味着允许它成为队列跳线。 foo
可以在第二个 fun()
之前实例化(当第二个 fun
正在等待时,foo
跳到它的前面)
他们完成的顺序可以变成 fun(), foo(), fun()
,(第一个 fun
会占用实例,因为实例可用,第二个应该等待,然后 foo
也等待, 但跳到了第二个 fun
)
有可能吗?或者是否有任何其他可能的数据结构
您需要根据您的目的查找 CompletableFuture。这样就可以实现任务排序。
CompletableFuture<String> text = CompletableFuture.supplyAsync(() -> {
return "";
}).thenApply(param -> {
return "";
}).thenApply(param -> {
return "";
});
我不知道有任何 built-in 工具可以完成这项任务,但实施起来并不难。由于您只想交换单个元素,因此不需要队列,而是交换器。
一个简单的实现可能看起来像
class SingleElementExchanger<T> {
int priorityConsumer;
T value;
public synchronized void set(T newValue) throws InterruptedException {
Objects.requireNonNull(newValue);
while(value != null) wait();
value = newValue;
notifyAll();
}
public synchronized T ordinaryGet() throws InterruptedException {
while(priorityConsumer != 0 || value == null) wait();
T received = value;
value = null;
notifyAll();
return received;
}
public synchronized T priorityGet() throws InterruptedException {
priorityConsumer++;
try {
while(value == null) wait();
T received = value;
value = null;
notifyAll();
return received;
}
finally {
priorityConsumer--;
}
}
}
对于您的两个普通消费者和一个优先消费者以及少量生产者,这可能已经足够了。
对于更多线程,您可能希望使用 Lock
,而不是使用 notifyAll()
。
class SingleElementExchanger<T> {
final Lock lock = new ReentrantLock();
final Condition empty = lock.newCondition(),
fullNoPri = lock.newCondition(), fullPri = lock.newCondition();
int priorityConsumer;
T value;
public void set(T newValue) throws InterruptedException {
Objects.requireNonNull(newValue);
lock.lock();
try {
while(value != null) empty.await();
value = newValue;
(priorityConsumer==0? fullNoPri: fullPri).signal();
}
finally {
lock.unlock();
}
}
public T ordinaryGet() throws InterruptedException {
lock.lock();
try {
while(priorityConsumer != 0 || value == null) fullNoPri.await();
T received = value;
value = null;
empty.signal();
return received;
}
finally {
lock.unlock();
}
}
public T priorityGet() throws InterruptedException {
lock.lock();
try {
priorityConsumer++;
while(value == null) fullPri.await();
T received = value;
value = null;
empty.signal();
return received;
}
finally {
priorityConsumer--;
lock.unlock();
}
}
}