单进程阻塞队列
Single process blocking queue
我正在编写一个与硬件通信的应用程序。虽然应用程序可以同时并行接收和处理多个请求,但硬件不能!
硬件要求这些并行请求基本上组织成一个线性请求链,每个请求一个接一个地执行。
我还需要能够对请求进行优先级排序,因为有些是不紧急的后台进程,有些是活动的,需要跳到队列的前面以便立即处理。
我对队列没有太多经验,但是如果这样的库还不存在,我会感到惊讶。
见https://docs.oracle.com/javase/7/docs/api/java/util/PriorityQueue.html
我建议对您的请求使用包装器,该包装器具有专门针对此队列的优先级值。例如,您可以使用 Long 作为该值,计算
value = timestamp % N * priorityLevel
N 取决于您处理事件需要多长时间
priorityLevel 值越低表示越紧急(大于零)
编辑:在评论中说明后
您似乎需要创建实例
ThreadPoolExecutor
并将您自己的队列传递给它,该队列将是 PriorityBlockingQueue. Task you put into this pool need to implement Comparable 的实例,它将按执行优先级对它们进行排序。
见一点old reference,但作为灵感应该足够了。
编辑:建议的优先级函数对于较小的 N 是危险的,现在看数字,long 可以在溢出发生之前乘以很多,所以将模数排除在外将无济于事更少,特别是如果你只有两个优先级(抱歉神秘化)
编辑:实施建议的解决方案
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class QTest {
public static void main(String[] args){
//create executor with exactly one thread (first four arguments) that is
//using priority queue to store tasks (it takes care of sorting by priority)
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new PriorityBlockingQueue());
executor.execute(new EventWrapper(1, "A"));
executor.execute(new EventWrapper(2, "B"));
executor.execute(new EventWrapper(1, "C"));
executor.execute(new EventWrapper(3, "D"));
executor.execute(new EventWrapper(1, "E"));
//just to have it terminated once test is done
executor.shutdown();
}
}
//in this wrapper should be loaded anything you want to have executed
class EventWrapper implements Comparable<EventWrapper>, Runnable{
public final long priority;
//name just to recognize what is being executed
public final String name;
public EventWrapper(int priority, String name){
//priority function out of current time, can be obviously inserted from elsewhere
this.priority = priority*System.currentTimeMillis();
this.name = name;
}
@Override
public int compareTo(EventWrapper that) {
//lower priority first
if(this.priority==that.priority)return 0;
return this.priority>that.priority?1:-1;
}
@Override
public void run() {
System.out.println("Executing task "+name+" with priority "+priority);
//sleep to rule out speed of insertion in executor
try {Thread.sleep(1000);
} catch (InterruptedException ex) {}
}
}
创建任务的结果是
Executing task A with priority 1433276819484
Executing task C with priority 1433276819485
Executing task E with priority 1433276819485
Executing task B with priority 2866553638970
Executing task D with priority 4299830458455
如果您已经熟悉 PriorityBlockingQueue
,为什么不简单地轮询它来处理硬件请求?
public class HardwareHandler
public static final PriorityBlockingQueue<Message> queue =
new PriorityBlockingQueue<Message>();
static {
while (true) {
Message m = queue.take();
handleMessage(m);
}
}
private static void handleMessage(Message m) {
// handle message....
}
}
根据所有有用的评论和答案,我确定可能没有针对此问题的预构建解决方案。为了尝试对问题提供全面的答案,我尝试使用 PriorityBlockingQueue
.
编写自己的实现
我在 StackExchange Code Review 上发布了代码,您可以看到完整的代码和任何社区建议的改进。
我正在编写一个与硬件通信的应用程序。虽然应用程序可以同时并行接收和处理多个请求,但硬件不能!
硬件要求这些并行请求基本上组织成一个线性请求链,每个请求一个接一个地执行。
我还需要能够对请求进行优先级排序,因为有些是不紧急的后台进程,有些是活动的,需要跳到队列的前面以便立即处理。
我对队列没有太多经验,但是如果这样的库还不存在,我会感到惊讶。
见https://docs.oracle.com/javase/7/docs/api/java/util/PriorityQueue.html
我建议对您的请求使用包装器,该包装器具有专门针对此队列的优先级值。例如,您可以使用 Long 作为该值,计算
value = timestamp % N * priorityLevel
N 取决于您处理事件需要多长时间
priorityLevel 值越低表示越紧急(大于零)
编辑:在评论中说明后
您似乎需要创建实例 ThreadPoolExecutor 并将您自己的队列传递给它,该队列将是 PriorityBlockingQueue. Task you put into this pool need to implement Comparable 的实例,它将按执行优先级对它们进行排序。
见一点old reference,但作为灵感应该足够了。
编辑:建议的优先级函数对于较小的 N 是危险的,现在看数字,long 可以在溢出发生之前乘以很多,所以将模数排除在外将无济于事更少,特别是如果你只有两个优先级(抱歉神秘化)
编辑:实施建议的解决方案
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class QTest {
public static void main(String[] args){
//create executor with exactly one thread (first four arguments) that is
//using priority queue to store tasks (it takes care of sorting by priority)
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new PriorityBlockingQueue());
executor.execute(new EventWrapper(1, "A"));
executor.execute(new EventWrapper(2, "B"));
executor.execute(new EventWrapper(1, "C"));
executor.execute(new EventWrapper(3, "D"));
executor.execute(new EventWrapper(1, "E"));
//just to have it terminated once test is done
executor.shutdown();
}
}
//in this wrapper should be loaded anything you want to have executed
class EventWrapper implements Comparable<EventWrapper>, Runnable{
public final long priority;
//name just to recognize what is being executed
public final String name;
public EventWrapper(int priority, String name){
//priority function out of current time, can be obviously inserted from elsewhere
this.priority = priority*System.currentTimeMillis();
this.name = name;
}
@Override
public int compareTo(EventWrapper that) {
//lower priority first
if(this.priority==that.priority)return 0;
return this.priority>that.priority?1:-1;
}
@Override
public void run() {
System.out.println("Executing task "+name+" with priority "+priority);
//sleep to rule out speed of insertion in executor
try {Thread.sleep(1000);
} catch (InterruptedException ex) {}
}
}
创建任务的结果是
Executing task A with priority 1433276819484
Executing task C with priority 1433276819485
Executing task E with priority 1433276819485
Executing task B with priority 2866553638970
Executing task D with priority 4299830458455
如果您已经熟悉 PriorityBlockingQueue
,为什么不简单地轮询它来处理硬件请求?
public class HardwareHandler
public static final PriorityBlockingQueue<Message> queue =
new PriorityBlockingQueue<Message>();
static {
while (true) {
Message m = queue.take();
handleMessage(m);
}
}
private static void handleMessage(Message m) {
// handle message....
}
}
根据所有有用的评论和答案,我确定可能没有针对此问题的预构建解决方案。为了尝试对问题提供全面的答案,我尝试使用 PriorityBlockingQueue
.
我在 StackExchange Code Review 上发布了代码,您可以看到完整的代码和任何社区建议的改进。