Java、线程和优先级
Java, Thread and Priorities
我正在解决一个问题,本来应该很容易解决,但我没那么容易解决。
问题很简单:我在 Linux/x86 上有一个 Java 程序 运行,它可以执行两个基本功能 F1 和 F2。我想将 F1 设置为具有更高的优先级,即使 F2 有时必须执行,即 F1 请求在队列中的事实不能让 F2 请求永远等待。
我的第一个想法是为每个功能设置单独的队列和一个线程池,我将 F1 池设置为有 8 个线程,而 F2 池只有 2 个线程。
根据我的预期,linux 会为每个线程提供公平的时间份额,因此 F1 将有 8 个量程,而 F2 将只有 2 个。如果没有 F1 请求,F2 池可以将每个量程分配给自己, F1 也应如此,以防 F2 没有请求。
但是,程序不是那样运行的,如果我收到一连串的 F2 请求和几个 F1 请求,后者需要很长时间才能轮到它。
谈论 Oracle HotSpot/linux 调度有意义吗?或者它不应该发生,什么会指向我的实施错误?
PS: 我读过 linux 调度,似乎 SCHED_OTHER (TS) 为每个任务提供时间份额,但是每次任务就绪时都没有执行它会获得更大的量程,如果 F2 池发生这种情况,那可能可以解释上述行为。
感谢和问候。
下面有一个示例源代码。
package test;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Created by giscardff on 08/07/18.
*/
public class TestThread {
// Test Program
public static void main(String args[]) throws Exception {
// queues containing jobs to be done
ArrayBlockingQueue<MyDTO> queueA = new ArrayBlockingQueue<>(100);
ArrayBlockingQueue<MyDTO> queueB = new ArrayBlockingQueue<>(100);
// create pool for functionality A
for(int i = 1; i <= 8; i++){
MyThread thread = new MyThread("ThreadA" + i, queueA);
thread.start();
}
// create pool for functionality B
for(int i = 1; i <= 2; i++){
MyThread thread = new MyThread("ThreadB" + i, queueB);
thread.start();
}
// create producer for A
// it will take 100ms between requests
Producer producerA = new Producer(queueA, 0);
producerA.start();
// create producer for B
// it will take 0ms between requests
Producer producerB = new Producer(queueB, 0);
producerB.start();
}
}
/**
* Just put a request into a queue
*/
class Producer extends Thread {
private ArrayBlockingQueue<MyDTO> queue;
private long sleep;
public Producer(ArrayBlockingQueue<MyDTO> queue, long sleep){
this.queue = queue;
this.sleep = sleep;
}
@Override
public void run() {
try {
while (true) {
if(sleep > 0)Thread.sleep(sleep);
queue.put(new MyDTO());
}
}catch(Exception ex){}
}
}
/**
* Retrieve a request from a queue, calculate how long request took to
* be received for each 1M requests
*/
class MyThread extends Thread {
private ArrayBlockingQueue<MyDTO> queue;
private long delay = 0;
private int count = 0;
public MyThread(String name, ArrayBlockingQueue<MyDTO> queue){
super(name);
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
MyDTO input = queue.take();
delay += System.currentTimeMillis() - Long.parseLong(input.getTime());
if(++count % 1000 == 0){
System.out.printf("%s: %d\n", getName(), delay / 10);
count = 0;
}
}
}catch(Exception ex){ex.printStackTrace();}
}
}
/**
* Just a DTO representing a request
* NOTE: The time was set as String to force CPU to do something more than just math operations
*/
class MyDTO {
private String time;
public MyDTO(){
this.time = "" + System.currentTimeMillis();
}
public String getTime() {
return time;
}
}
您似乎遇到了一些问题。我将尝试总结它们并提供前进道路的起点:
线程争用
使用 BlockingQueue
是有代价的——每个写操作(put & take)都在生产者或消费者之间进行锁竞争。您的 "A pool" 有 9 个线程争用 queueA
的写锁(1 个生产者,8 个消费者),而您的 "B pool" 有 3 个线程争用 queueB
的锁(1生产者,2个消费者)。
This related answer 提供了有关争用的更多详细信息。解决此问题的最简单方法是 "use less threads" 或使用 "lock-free" 机制来消除争用。
线程调度
如评论中所述,您受制于 JVM 如何安排您的线程。
如果 java 线程调度在 CPU 上使用完全公平的时间份额,您可能会看到同一池中每个线程的消耗计数彼此非常接近。您可能已经注意到它们不是 - 我运行您的(稍作修改的)代码偶尔会给我 300K 或更多线程的计数分布。
当每个 CPU 绑定线程有足够的 CPU 内核时(您的示例代码中有 12 个),您通常可以做得更好,但在许多情况下它远非理想情况下,尤其是在面对线程争用时。
你能做什么?
- 构建您自己的公平逻辑 - 不要依赖 JVM 线程调度程序来实现公平,因为它不会。
- 对于您的情况,一个简单的想法是保留两个队列,但使用一个池来处理两个队列 - 使用循环法或
Math.random()
(即:if (rand < 0.8) { queueA.poll();}
)来确定哪个要从中轮询的队列。 注意 - 使用poll
可以轻松处理队列为空的情况而不会阻塞。
- 在您的硬件上试验 CPU 绑定线程数 运行。根据我对上面 (1) 的建议,您甚至可以让一个工作线程公平地处理两个队列。请记住,过多的线程争用相同的资源会减慢您的处理速度。
线程是不是很有趣? :)
我正在解决一个问题,本来应该很容易解决,但我没那么容易解决。
问题很简单:我在 Linux/x86 上有一个 Java 程序 运行,它可以执行两个基本功能 F1 和 F2。我想将 F1 设置为具有更高的优先级,即使 F2 有时必须执行,即 F1 请求在队列中的事实不能让 F2 请求永远等待。
我的第一个想法是为每个功能设置单独的队列和一个线程池,我将 F1 池设置为有 8 个线程,而 F2 池只有 2 个线程。
根据我的预期,linux 会为每个线程提供公平的时间份额,因此 F1 将有 8 个量程,而 F2 将只有 2 个。如果没有 F1 请求,F2 池可以将每个量程分配给自己, F1 也应如此,以防 F2 没有请求。
但是,程序不是那样运行的,如果我收到一连串的 F2 请求和几个 F1 请求,后者需要很长时间才能轮到它。
谈论 Oracle HotSpot/linux 调度有意义吗?或者它不应该发生,什么会指向我的实施错误?
PS: 我读过 linux 调度,似乎 SCHED_OTHER (TS) 为每个任务提供时间份额,但是每次任务就绪时都没有执行它会获得更大的量程,如果 F2 池发生这种情况,那可能可以解释上述行为。
感谢和问候。
下面有一个示例源代码。
package test;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Created by giscardff on 08/07/18.
*/
public class TestThread {
// Test Program
public static void main(String args[]) throws Exception {
// queues containing jobs to be done
ArrayBlockingQueue<MyDTO> queueA = new ArrayBlockingQueue<>(100);
ArrayBlockingQueue<MyDTO> queueB = new ArrayBlockingQueue<>(100);
// create pool for functionality A
for(int i = 1; i <= 8; i++){
MyThread thread = new MyThread("ThreadA" + i, queueA);
thread.start();
}
// create pool for functionality B
for(int i = 1; i <= 2; i++){
MyThread thread = new MyThread("ThreadB" + i, queueB);
thread.start();
}
// create producer for A
// it will take 100ms between requests
Producer producerA = new Producer(queueA, 0);
producerA.start();
// create producer for B
// it will take 0ms between requests
Producer producerB = new Producer(queueB, 0);
producerB.start();
}
}
/**
* Just put a request into a queue
*/
class Producer extends Thread {
private ArrayBlockingQueue<MyDTO> queue;
private long sleep;
public Producer(ArrayBlockingQueue<MyDTO> queue, long sleep){
this.queue = queue;
this.sleep = sleep;
}
@Override
public void run() {
try {
while (true) {
if(sleep > 0)Thread.sleep(sleep);
queue.put(new MyDTO());
}
}catch(Exception ex){}
}
}
/**
* Retrieve a request from a queue, calculate how long request took to
* be received for each 1M requests
*/
class MyThread extends Thread {
private ArrayBlockingQueue<MyDTO> queue;
private long delay = 0;
private int count = 0;
public MyThread(String name, ArrayBlockingQueue<MyDTO> queue){
super(name);
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
MyDTO input = queue.take();
delay += System.currentTimeMillis() - Long.parseLong(input.getTime());
if(++count % 1000 == 0){
System.out.printf("%s: %d\n", getName(), delay / 10);
count = 0;
}
}
}catch(Exception ex){ex.printStackTrace();}
}
}
/**
* Just a DTO representing a request
* NOTE: The time was set as String to force CPU to do something more than just math operations
*/
class MyDTO {
private String time;
public MyDTO(){
this.time = "" + System.currentTimeMillis();
}
public String getTime() {
return time;
}
}
您似乎遇到了一些问题。我将尝试总结它们并提供前进道路的起点:
线程争用
使用 BlockingQueue
是有代价的——每个写操作(put & take)都在生产者或消费者之间进行锁竞争。您的 "A pool" 有 9 个线程争用 queueA
的写锁(1 个生产者,8 个消费者),而您的 "B pool" 有 3 个线程争用 queueB
的锁(1生产者,2个消费者)。
This related answer 提供了有关争用的更多详细信息。解决此问题的最简单方法是 "use less threads" 或使用 "lock-free" 机制来消除争用。
线程调度
如评论中所述,您受制于 JVM 如何安排您的线程。
如果 java 线程调度在 CPU 上使用完全公平的时间份额,您可能会看到同一池中每个线程的消耗计数彼此非常接近。您可能已经注意到它们不是 - 我运行您的(稍作修改的)代码偶尔会给我 300K 或更多线程的计数分布。
当每个 CPU 绑定线程有足够的 CPU 内核时(您的示例代码中有 12 个),您通常可以做得更好,但在许多情况下它远非理想情况下,尤其是在面对线程争用时。
你能做什么?
- 构建您自己的公平逻辑 - 不要依赖 JVM 线程调度程序来实现公平,因为它不会。
- 对于您的情况,一个简单的想法是保留两个队列,但使用一个池来处理两个队列 - 使用循环法或
Math.random()
(即:if (rand < 0.8) { queueA.poll();}
)来确定哪个要从中轮询的队列。 注意 - 使用poll
可以轻松处理队列为空的情况而不会阻塞。
- 对于您的情况,一个简单的想法是保留两个队列,但使用一个池来处理两个队列 - 使用循环法或
- 在您的硬件上试验 CPU 绑定线程数 运行。根据我对上面 (1) 的建议,您甚至可以让一个工作线程公平地处理两个队列。请记住,过多的线程争用相同的资源会减慢您的处理速度。
线程是不是很有趣? :)