Java ExecutorService:- 事件发生时通知线程唤醒
Java ExecutorService:- Notify a thread to wake up when an event occurs
我有一个管理器 class,多个线程将自己注册到该管理器(使用 UUID
为每个请求生成唯一标识符),提供有效负载以进行处理并从管理器获得相应的响应。我正在使用 java.util.concurrent.ExecutorService
来启动多个线程。这是用于测试我的管理器功能的实现 -
public class ManagerTest {
public static void main(String[] args) {
try {
Manager myManager = new Manager();
// Start listening to the messages from different threads
myManager.consumeMessages();
int num_threads = Integer.parseInt(args[0]);
ExecutorService executor = Executors.newFixedThreadPool(num_threads);
for (int i = 0; i < num_threads; i++) {
// class implementation is given below
Runnable worker = new MyRunnable(myManager);
executor.execute(worker);
}
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
}
System.out.println("\nFinished all threads");
myManager.closeConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
下面是MyRunnable
class
的实现
class MyRunnable implements Runnable {
private Manager managerObj;
public MyRunnable(Manager managerObj) {
this.managerObj = managerObj;
}
@Override
public void run() {
try {
Random rand = new Random();
int n = rand.nextInt(35);
String requestId = UUID.randomUUID().toString();
managerObj.registerRequest(requestId, n);
managerObj.publishMessage(requestId);
// Want to avoid this while loop
while( ! managerObj.getRequestStatus(requestId)){
}
int response = managerObj.getRequestResponse(requestId);
// do something else
managerObj.unregisterRequest(requestId);
} catch (IOException e) {
e.printStackTrace();
}
}
}
管理器将处理请求,根据负载的不同,请求的响应可能需要不同的时间。每当管理器获得响应时,它都会通过调用此函数 setRequestStatus(requestId)
将请求状态设置为 true。此后线程将从 while loop
退出并继续执行。
如果代码工作正常,但线程正在做太多工作,然后需要不断循环 while 循环直到满足条件。
他们是在向管理器发送请求后让线程休眠的一种方法,并且管理器向该线程发出信号以在其响应准备就绪时唤醒。
请原谅我,如果这对某些人来说太简单了,我是 java 和 java 线程接口的新手。
没关系,我们习惯了简单的问题,而你的问题其实写得很好,有一个合理的问题要解决,我们这里每天都看到很多糟糕的事情,就像人们不知道自己想要什么,怎么问等等
所以,你正在做的是一个繁忙的自旋循环,这是一件非常糟糕的事情,因为 a) 它每个线程消耗了一个完整的 CPU 内核,并且 b) 它实际上保持 CPU 忙碌,这意味着它正在从可能有有用工作要做的其他线程窃取处理时间。
有很多方法可以解决这个问题,我会从最差到最好列出它们。
改进代码的最简单方法是调用 java.lang.Thread.sleep(long millis)
方法,将其作为参数传递 0
。这也被称为 "yield" 操作,它本质上意味着 "if there are any other threads that have some useful work to do, let them run, and return back to me once they are done." 这只比忙自旋循环稍微好一点,因为它仍然会消耗 100% CPU。好处是它只会消耗CPU而其他线程无事可做,所以至少不会拖慢其他事情。
改进代码的下一个最佳但仍然不是很聪明的方法是调用 java.lang.Thread.sleep(long millis)
方法,将其作为参数传递给 1
。这被称为 "pass" 操作,它本质上意味着 "release the remainder of my time-slice to any other threads that might have some useful work to do"。换句话说,剩余的时间片被没收,即使整个系统不需要做任何有用的工作。这将使 CPU 消耗几乎为零。缺点是 a) CPU 消耗实际上会略高于零,b) 机器将无法进入某种低功耗睡眠模式,以及 c) 您的工作线程的响应速度会稍慢:它将仅在时间片边界上完成工作。
解决问题的最佳方法是使用 java 中内置的 同步 机制,如以下答案所述: 这样不但不会消耗零CPU,甚至可以让机器在等待的时候进入低功耗模式
要解决最常见情况下的问题,您不想只等到某个条件,但实际上还要传递有关要完成的工作的信息,您可以使用 BlockingQueue
。 (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html)阻塞队列使用java内置同步机制和允许一个线程将信息传递给另一个。
@MikeNakis 已经给出了很好的答案,但我也倾向于提供另一种选择。
然而,此选项涉及将管理器 API 更改为 return a Future。
我建议对经理进行的更改如下:
- 放弃
getRequestStatus()
方法
- 制作
getRequestResponse()
returnFuture<Integer>
进行这些更改后 MyRunnable
的 run()
方法可以更改为:
public void run() {
try {
Random rand = new Random();
int n = rand.nextInt(35);
String requestId = UUID.randomUUID().toString();
managerObj.registerRequest(requestId, n);
managerObj.publishMessage(requestId);
// Future.get() blocks and waits for the result without consuming CPU
int response = managerObj.getRequestResponse(requestId).get();
// do something else
managerObj.unregisterRequest(requestId);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
实现 Future<>
的最简单方法可能是使用 Java 的 java.util.concurrent.FutureTask
。
我有一个管理器 class,多个线程将自己注册到该管理器(使用 UUID
为每个请求生成唯一标识符),提供有效负载以进行处理并从管理器获得相应的响应。我正在使用 java.util.concurrent.ExecutorService
来启动多个线程。这是用于测试我的管理器功能的实现 -
public class ManagerTest {
public static void main(String[] args) {
try {
Manager myManager = new Manager();
// Start listening to the messages from different threads
myManager.consumeMessages();
int num_threads = Integer.parseInt(args[0]);
ExecutorService executor = Executors.newFixedThreadPool(num_threads);
for (int i = 0; i < num_threads; i++) {
// class implementation is given below
Runnable worker = new MyRunnable(myManager);
executor.execute(worker);
}
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
}
System.out.println("\nFinished all threads");
myManager.closeConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
下面是MyRunnable
class
class MyRunnable implements Runnable {
private Manager managerObj;
public MyRunnable(Manager managerObj) {
this.managerObj = managerObj;
}
@Override
public void run() {
try {
Random rand = new Random();
int n = rand.nextInt(35);
String requestId = UUID.randomUUID().toString();
managerObj.registerRequest(requestId, n);
managerObj.publishMessage(requestId);
// Want to avoid this while loop
while( ! managerObj.getRequestStatus(requestId)){
}
int response = managerObj.getRequestResponse(requestId);
// do something else
managerObj.unregisterRequest(requestId);
} catch (IOException e) {
e.printStackTrace();
}
}
}
管理器将处理请求,根据负载的不同,请求的响应可能需要不同的时间。每当管理器获得响应时,它都会通过调用此函数 setRequestStatus(requestId)
将请求状态设置为 true。此后线程将从 while loop
退出并继续执行。
如果代码工作正常,但线程正在做太多工作,然后需要不断循环 while 循环直到满足条件。
他们是在向管理器发送请求后让线程休眠的一种方法,并且管理器向该线程发出信号以在其响应准备就绪时唤醒。
请原谅我,如果这对某些人来说太简单了,我是 java 和 java 线程接口的新手。
没关系,我们习惯了简单的问题,而你的问题其实写得很好,有一个合理的问题要解决,我们这里每天都看到很多糟糕的事情,就像人们不知道自己想要什么,怎么问等等
所以,你正在做的是一个繁忙的自旋循环,这是一件非常糟糕的事情,因为 a) 它每个线程消耗了一个完整的 CPU 内核,并且 b) 它实际上保持 CPU 忙碌,这意味着它正在从可能有有用工作要做的其他线程窃取处理时间。
有很多方法可以解决这个问题,我会从最差到最好列出它们。
改进代码的最简单方法是调用
java.lang.Thread.sleep(long millis)
方法,将其作为参数传递0
。这也被称为 "yield" 操作,它本质上意味着 "if there are any other threads that have some useful work to do, let them run, and return back to me once they are done." 这只比忙自旋循环稍微好一点,因为它仍然会消耗 100% CPU。好处是它只会消耗CPU而其他线程无事可做,所以至少不会拖慢其他事情。改进代码的下一个最佳但仍然不是很聪明的方法是调用
java.lang.Thread.sleep(long millis)
方法,将其作为参数传递给1
。这被称为 "pass" 操作,它本质上意味着 "release the remainder of my time-slice to any other threads that might have some useful work to do"。换句话说,剩余的时间片被没收,即使整个系统不需要做任何有用的工作。这将使 CPU 消耗几乎为零。缺点是 a) CPU 消耗实际上会略高于零,b) 机器将无法进入某种低功耗睡眠模式,以及 c) 您的工作线程的响应速度会稍慢:它将仅在时间片边界上完成工作。解决问题的最佳方法是使用 java 中内置的 同步 机制,如以下答案所述: 这样不但不会消耗零CPU,甚至可以让机器在等待的时候进入低功耗模式
要解决最常见情况下的问题,您不想只等到某个条件,但实际上还要传递有关要完成的工作的信息,您可以使用
BlockingQueue
。 (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html)阻塞队列使用java内置同步机制和允许一个线程将信息传递给另一个。
@MikeNakis 已经给出了很好的答案,但我也倾向于提供另一种选择。
然而,此选项涉及将管理器 API 更改为 return a Future。
我建议对经理进行的更改如下:
- 放弃
getRequestStatus()
方法 - 制作
getRequestResponse()
returnFuture<Integer>
进行这些更改后 MyRunnable
的 run()
方法可以更改为:
public void run() {
try {
Random rand = new Random();
int n = rand.nextInt(35);
String requestId = UUID.randomUUID().toString();
managerObj.registerRequest(requestId, n);
managerObj.publishMessage(requestId);
// Future.get() blocks and waits for the result without consuming CPU
int response = managerObj.getRequestResponse(requestId).get();
// do something else
managerObj.unregisterRequest(requestId);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
实现 Future<>
的最简单方法可能是使用 Java 的 java.util.concurrent.FutureTask
。