在 ConcurrentMap 中存储线程安全吗?
is it safe to store threads in a ConcurrentMap?
我正在构建一个后端服务,通过对我的服务的 REST 调用创建一个新线程。如果线程在 5 分钟内没有收到任何消息,线程将等待另一个 REST 调用,线程将死亡。
为了跟踪所有线程,我有一个集合来跟踪所有当前 运行 线程,以便当 REST 调用最终进入时(例如用户接受或拒绝某个操作),我可以识别该线程使用用户 ID。如果它被拒绝,我们将只从集合中删除该线程,如果它被接受,该线程可以继续执行下一个操作。我已经使用 ConcurrentMap 实现了这个以避免并发问题。
因为这是我第一次使用线程,所以我想确保我没有忽略可能出现的任何问题。请查看我的代码并告诉我是否可以做得更好或者是否存在任何缺陷。
public class UserAction extends Thread {
int userID;
boolean isAccepted = false;
boolean isDeclined = false;
long timeNow = System.currentTimeMillis();
long timeElapsed = timeNow + 50000;
public UserAction(int userID) {
this.userID = userID;
}
public void declineJob() {
this.isDeclined = true;
}
public void acceptJob() {
this.isAccepted = true;
}
public boolean waitForApproval(){
while (System.currentTimeMillis() < timeElapsed){
System.out.println("waiting for approval");
if (isAccepted) {
return true;
} else if (declined) {
return false;
}
}
return isAccepted;
}
@Override
public void run() {
if (!waitForApproval) {
// mustve timed out or user declined so remove from list and return thread immediately
tCollection.remove(userID);
// end the thread here
return;
}
// mustve been accepted so continue working
}
}
public class Controller {
public static ConcurrentHashMap<Integer, Thread> tCollection = new ConcurrentHashMap<>();
public static void main(String[] args) {
int barberID1 = 1;
int barberID2 = 2;
tCollection.put(barberID1, new UserAction(barberID1));
tCollection.put(barberID2, new UserAction(barberID2));
tCollection.get(barberID1).start();
tCollection.get(barberID2).start();
Thread.sleep(1000);
// simulate REST call accepting/declining job after 1 second. Usually this would be in a spring mvc RESTcontroller in a different class.
tCollection.get(barberID1).acceptJob();
tCollection.get(barberID2).declineJob();
}
}
为此您不需要(显式)线程。只是在第一次 rest 调用时创建的任务对象的共享池。
当第二个 rest 调用到来时,您已经有一个线程可以使用(处理 rest 调用的线程)。您只需要根据用户 ID 检索任务对象即可。您还需要摆脱过期的任务,这可以通过例如 DelayQueue
.
来完成
伪代码:
public void rest1(User u) {
UserTask ut = new UserTask(u);
pool.put(u.getId(), ut);
delayPool.put(ut); // Assuming UserTask implements Delayed with a 5 minute delay
}
public void rest2(User u, Action a) {
UserTask ut = pool.get(u.getId());
if(!a.isAccepted() || ut == null)
pool.remove(u.getId());
else
process(ut);
// Clean up the pool from any expired tasks, can also be done in the beginning
// of the method, if you want to make sure that expired actions aren't performed
while((UserTask u = delayPool.poll()) != null)
pool.remove(u.getId());
}
存在一个同步问题,您应该将 isAccepted
和 isDeclined
标记为 class AtomicBoolean
。
一个关键概念是您需要采取措施确保将一个线程中的内存更改传达给需要该数据的其他线程。它们被称为内存栅栏,它们通常隐式出现在同步调用之间。
具有 'central memory' 的(简单)冯诺依曼架构的想法对于大多数现代机器来说是错误的,您需要知道数据在 caches/threads 之间正确共享。
也正如其他人所建议的,为每个任务创建一个线程是一个糟糕的模型。如果提交的任务太多,它的扩展性很差,并且会使您的应用程序容易崩溃。内存有一些限制,所以你一次只能有这么多挂起的任务,但线程的上限会低得多。
这将变得更糟,因为你在等待。自旋等待将线程置于循环中等待条件。更好的模型会在 ConditionVariable
上等待,因此操作系统可以暂停不做任何事情(除了等待)的线程,直到通知它们正在等待的东西已经(或可能)准备就绪。
创建和销毁线程通常需要大量的时间和资源开销。考虑到大多数平台可以同时只执行相对较少的线程,创建大量 'expensive' 线程以让它们花费大部分时间换出(挂起)什么都不做是非常低效的。
正确的模型启动一个固定数量(或相对固定数量)的线程池,并将任务放置在线程 'take' 从中工作和处理的共享队列中。
该模型通常称为 "Thread Pool"。
您应该查看的入门级实现是 ThreadPoolExecutor
:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
我正在构建一个后端服务,通过对我的服务的 REST 调用创建一个新线程。如果线程在 5 分钟内没有收到任何消息,线程将等待另一个 REST 调用,线程将死亡。 为了跟踪所有线程,我有一个集合来跟踪所有当前 运行 线程,以便当 REST 调用最终进入时(例如用户接受或拒绝某个操作),我可以识别该线程使用用户 ID。如果它被拒绝,我们将只从集合中删除该线程,如果它被接受,该线程可以继续执行下一个操作。我已经使用 ConcurrentMap 实现了这个以避免并发问题。
因为这是我第一次使用线程,所以我想确保我没有忽略可能出现的任何问题。请查看我的代码并告诉我是否可以做得更好或者是否存在任何缺陷。
public class UserAction extends Thread {
int userID;
boolean isAccepted = false;
boolean isDeclined = false;
long timeNow = System.currentTimeMillis();
long timeElapsed = timeNow + 50000;
public UserAction(int userID) {
this.userID = userID;
}
public void declineJob() {
this.isDeclined = true;
}
public void acceptJob() {
this.isAccepted = true;
}
public boolean waitForApproval(){
while (System.currentTimeMillis() < timeElapsed){
System.out.println("waiting for approval");
if (isAccepted) {
return true;
} else if (declined) {
return false;
}
}
return isAccepted;
}
@Override
public void run() {
if (!waitForApproval) {
// mustve timed out or user declined so remove from list and return thread immediately
tCollection.remove(userID);
// end the thread here
return;
}
// mustve been accepted so continue working
}
}
public class Controller {
public static ConcurrentHashMap<Integer, Thread> tCollection = new ConcurrentHashMap<>();
public static void main(String[] args) {
int barberID1 = 1;
int barberID2 = 2;
tCollection.put(barberID1, new UserAction(barberID1));
tCollection.put(barberID2, new UserAction(barberID2));
tCollection.get(barberID1).start();
tCollection.get(barberID2).start();
Thread.sleep(1000);
// simulate REST call accepting/declining job after 1 second. Usually this would be in a spring mvc RESTcontroller in a different class.
tCollection.get(barberID1).acceptJob();
tCollection.get(barberID2).declineJob();
}
}
为此您不需要(显式)线程。只是在第一次 rest 调用时创建的任务对象的共享池。
当第二个 rest 调用到来时,您已经有一个线程可以使用(处理 rest 调用的线程)。您只需要根据用户 ID 检索任务对象即可。您还需要摆脱过期的任务,这可以通过例如 DelayQueue
.
伪代码:
public void rest1(User u) {
UserTask ut = new UserTask(u);
pool.put(u.getId(), ut);
delayPool.put(ut); // Assuming UserTask implements Delayed with a 5 minute delay
}
public void rest2(User u, Action a) {
UserTask ut = pool.get(u.getId());
if(!a.isAccepted() || ut == null)
pool.remove(u.getId());
else
process(ut);
// Clean up the pool from any expired tasks, can also be done in the beginning
// of the method, if you want to make sure that expired actions aren't performed
while((UserTask u = delayPool.poll()) != null)
pool.remove(u.getId());
}
存在一个同步问题,您应该将 isAccepted
和 isDeclined
标记为 class AtomicBoolean
。
一个关键概念是您需要采取措施确保将一个线程中的内存更改传达给需要该数据的其他线程。它们被称为内存栅栏,它们通常隐式出现在同步调用之间。
具有 'central memory' 的(简单)冯诺依曼架构的想法对于大多数现代机器来说是错误的,您需要知道数据在 caches/threads 之间正确共享。
也正如其他人所建议的,为每个任务创建一个线程是一个糟糕的模型。如果提交的任务太多,它的扩展性很差,并且会使您的应用程序容易崩溃。内存有一些限制,所以你一次只能有这么多挂起的任务,但线程的上限会低得多。
这将变得更糟,因为你在等待。自旋等待将线程置于循环中等待条件。更好的模型会在 ConditionVariable
上等待,因此操作系统可以暂停不做任何事情(除了等待)的线程,直到通知它们正在等待的东西已经(或可能)准备就绪。
创建和销毁线程通常需要大量的时间和资源开销。考虑到大多数平台可以同时只执行相对较少的线程,创建大量 'expensive' 线程以让它们花费大部分时间换出(挂起)什么都不做是非常低效的。
正确的模型启动一个固定数量(或相对固定数量)的线程池,并将任务放置在线程 'take' 从中工作和处理的共享队列中。
该模型通常称为 "Thread Pool"。
您应该查看的入门级实现是 ThreadPoolExecutor
:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html