如何在多线程中为用户提供公平性
How to provide fairness to users in multithreading
在我们的应用程序中,我们有一个大小为 50 的线程池。多个线程登录到应用程序并为它们分配线程。一个用户消耗的线程数基本上取决于用户试图加载的数据量。现在的问题是:-
当高流量用户登录时,它开始消耗 40 个线程,导致其他低流量用户等待。我们希望有一种机制,我们可以在其中为用户提供一些公平性,以便一个用户不会消耗所有线程。你能为此提出一些智能解决方案吗?
使用信号量来控制哪些线程必须等待/休眠,哪些可以继续。
登录的用户越多,休眠的用户线程越多。
线程完成后,您可以唤醒休眠的线程。
如果您可以更改服务器设置,允许每个用户有 50 个线程,然后在需要时让它们休眠。这样你就可以让一个用户全速运行,并在公平的基础上放慢他的速度。睡眠意味着线程将暂停并停止使用 CPU 时间直到被唤醒。
您可以使用速率限制器。番石榴有它。
http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/RateLimiter.html
例如,您可以限制用户每秒触发的操作数,并为其他用户留出空间。
我不确定这种情况是否有任何现成的解决方案,但您可以通过以下方式实现(未经测试,如果不能完全奏效,请多多包涵)。
用户请求类似于-
class UserRequest implements Runnable {
private final int userId;
public UserRequest(int userId) {
this.userId = userId;
}
public void run() {
// process the request
}
public int getUserId() {
return userId;
}
}
现在服务器是这样的-
class FairServer {
private final int maxActiveRequests;
private final int maxWaitingRequests;
private final int minActiveRequestPerUser;
private final int maxActiveRequestsPerUser;
private final AtomicInteger currentTotalActiveRequests;
private final Map<Integer, AtomicInteger> currentActiveRequestsPerUser;
private final BlockingQueue<UserRequest> waitingQueue;
private final ThreadPoolExecutor threadPoolExecutor;
private final ScheduledExecutorService scheduledExecutorService;
private final Lock lock;
private AtomicInteger currentLimitPerUser;
public FairServer(int maxActiveRequests, int maxWaitingRequests, int minActiveRequestPerUser, int maxActiveRequestsPerUser) {
this.maxActiveRequests = maxActiveRequests;
this.maxWaitingRequests = maxWaitingRequests;
this.minActiveRequestPerUser = minActiveRequestPerUser;
this.maxActiveRequestsPerUser = maxActiveRequestsPerUser;
this.currentLimitPerUser = new AtomicInteger(0);
this.currentTotalActiveRequests = new AtomicInteger(0);
this.currentActiveRequestsPerUser = new HashMap<Integer, AtomicInteger>();
this.waitingQueue = new PriorityBlockingQueue<UserRequest>(maxWaitingRequests, new UserRequestComparator());
this.lock = new ReentrantLock();
this.threadPoolExecutor = new LocalThreadPoolExecutor(0, maxActiveRequests, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
this.scheduledExecutorService.scheduleWithFixedDelay(new FairnessManager(), 1L, 1L, TimeUnit.SECONDS);
}
public void submitUserRequest(UserRequest userRequest) {
if (waitingQueue.size() >= maxWaitingRequests) {
throw new RuntimeException("Max limit reached");
}
if (currentTotalActiveRequests.get() < maxActiveRequests) {
lock.lock();
try {
int currentLimit = Math.round(maxActiveRequests / (currentActiveRequestsPerUser.size() == 0 ? 1 : currentActiveRequestsPerUser.size()));
currentLimitPerUser.set(currentLimit < minActiveRequestPerUser ? minActiveRequestPerUser : (currentLimit > maxActiveRequestsPerUser ? maxActiveRequestsPerUser : currentLimit));
trySubmit(userRequest);
} finally {
lock.unlock();
}
} else {
// add request to waiting queue and let FairnessManager handle it
waitingQueue.add(userRequest);
}
}
private void trySubmit(UserRequest userRequest) {
// directly submit to thread pool executor if less load overall and per user
AtomicInteger counter = currentActiveRequestsPerUser.get(userRequest.getUserId());
if (currentTotalActiveRequests.get() < maxActiveRequests && (counter == null || counter.get() < currentLimitPerUser.get())) {
currentTotalActiveRequests.incrementAndGet();
if (counter == null) {
currentActiveRequestsPerUser.put(userRequest.getUserId(), (counter = new AtomicInteger(0)));
}
counter.incrementAndGet();
threadPoolExecutor.submit(userRequest);
} else {
// add request to waiting queue and let FairnessManager handle it
waitingQueue.add(userRequest);
}
}
private class UserRequestComparator implements Comparator<UserRequest> {
@Override
public int compare(UserRequest o1, UserRequest o2) {
AtomicInteger count1 = currentActiveRequestsPerUser.get(o1.getUserId());
AtomicInteger count2 = currentActiveRequestsPerUser.get(o2.getUserId());
if (count1 == null) { // this means no current requests by this user
return -1;
} else if (count2 == null) { // this means no current requests by this user
return 1;
} else {
return count1.get() <= count2.get() ? -1 : 1; // user with less current requests goes up in the queue
}
}
}
private class FairnessManager implements Runnable {
public void run() {
if (!waitingQueue.isEmpty() && currentTotalActiveRequests.get() < maxActiveRequests) {
lock.lock();
try {
int maxIterations = 5; // just to avoid endless attempts
UserRequest userRequest;
while (maxIterations-- > 0 && (userRequest = waitingQueue.poll()) != null) {
trySubmit(userRequest);
}
} finally {
lock.unlock();
}
}
}
}
private class LocalThreadPoolExecutor extends ThreadPoolExecutor {
public LocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (r instanceof UserRequest) {
currentTotalActiveRequests.decrementAndGet();
int userId = ((UserRequest) r).getUserId();
lock.lock();
try {
int count = currentActiveRequestsPerUser.get(userId).decrementAndGet();
if (count == 0) {
currentActiveRequestsPerUser.remove(userId);
}
} finally {
lock.unlock();
}
}
}
}
}
用户请求通过submitUserRequest(UserRequest userRequest)
提交给服务器,它要么直接将其提交给线程池管理器,要么在服务器中来自同一用户或总体的请求数量更多时让请求等待。您需要定义每个用户的最小和最大请求数,然后服务器根据当前负载动态调整每个用户的限制。
服务器有一个清除等待队列的内部线程。
我认为最好的办法是使用 BlockingPriorityQueue 作为 ThreadPoolExecutor 的工作队列。然后用一个对象包装每个任务,该对象知道用户尝试加载的请求数量,这样当该数量小于其他数量时,它应该提高优先级
public class PriorityTask implements Comparable<Task>, Runnable {
private final Runnable task;
private final int request;
public PriorityTask(Runnable task, int request) { .. }
public void run(){
task.run();
}
public int compareTo(Task task){
return this.request > task.request ? 1 : -1;
}
}
因此高负载用户将退避低负载用户。这显然会导致高负载用户饥饿。为避免饥饿,您还可以添加一个 Date
字段,该字段表示,如果此任务在 10 秒内未执行(作为我选择的随机时间),尽管是高负载用户仍执行它。
这不一定是完美的,但可以帮助您朝着更好的方向发展。
在我们的应用程序中,我们有一个大小为 50 的线程池。多个线程登录到应用程序并为它们分配线程。一个用户消耗的线程数基本上取决于用户试图加载的数据量。现在的问题是:- 当高流量用户登录时,它开始消耗 40 个线程,导致其他低流量用户等待。我们希望有一种机制,我们可以在其中为用户提供一些公平性,以便一个用户不会消耗所有线程。你能为此提出一些智能解决方案吗?
使用信号量来控制哪些线程必须等待/休眠,哪些可以继续。 登录的用户越多,休眠的用户线程越多。
线程完成后,您可以唤醒休眠的线程。
如果您可以更改服务器设置,允许每个用户有 50 个线程,然后在需要时让它们休眠。这样你就可以让一个用户全速运行,并在公平的基础上放慢他的速度。睡眠意味着线程将暂停并停止使用 CPU 时间直到被唤醒。
您可以使用速率限制器。番石榴有它。 http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/RateLimiter.html
例如,您可以限制用户每秒触发的操作数,并为其他用户留出空间。
我不确定这种情况是否有任何现成的解决方案,但您可以通过以下方式实现(未经测试,如果不能完全奏效,请多多包涵)。
用户请求类似于-
class UserRequest implements Runnable {
private final int userId;
public UserRequest(int userId) {
this.userId = userId;
}
public void run() {
// process the request
}
public int getUserId() {
return userId;
}
}
现在服务器是这样的-
class FairServer {
private final int maxActiveRequests;
private final int maxWaitingRequests;
private final int minActiveRequestPerUser;
private final int maxActiveRequestsPerUser;
private final AtomicInteger currentTotalActiveRequests;
private final Map<Integer, AtomicInteger> currentActiveRequestsPerUser;
private final BlockingQueue<UserRequest> waitingQueue;
private final ThreadPoolExecutor threadPoolExecutor;
private final ScheduledExecutorService scheduledExecutorService;
private final Lock lock;
private AtomicInteger currentLimitPerUser;
public FairServer(int maxActiveRequests, int maxWaitingRequests, int minActiveRequestPerUser, int maxActiveRequestsPerUser) {
this.maxActiveRequests = maxActiveRequests;
this.maxWaitingRequests = maxWaitingRequests;
this.minActiveRequestPerUser = minActiveRequestPerUser;
this.maxActiveRequestsPerUser = maxActiveRequestsPerUser;
this.currentLimitPerUser = new AtomicInteger(0);
this.currentTotalActiveRequests = new AtomicInteger(0);
this.currentActiveRequestsPerUser = new HashMap<Integer, AtomicInteger>();
this.waitingQueue = new PriorityBlockingQueue<UserRequest>(maxWaitingRequests, new UserRequestComparator());
this.lock = new ReentrantLock();
this.threadPoolExecutor = new LocalThreadPoolExecutor(0, maxActiveRequests, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
this.scheduledExecutorService.scheduleWithFixedDelay(new FairnessManager(), 1L, 1L, TimeUnit.SECONDS);
}
public void submitUserRequest(UserRequest userRequest) {
if (waitingQueue.size() >= maxWaitingRequests) {
throw new RuntimeException("Max limit reached");
}
if (currentTotalActiveRequests.get() < maxActiveRequests) {
lock.lock();
try {
int currentLimit = Math.round(maxActiveRequests / (currentActiveRequestsPerUser.size() == 0 ? 1 : currentActiveRequestsPerUser.size()));
currentLimitPerUser.set(currentLimit < minActiveRequestPerUser ? minActiveRequestPerUser : (currentLimit > maxActiveRequestsPerUser ? maxActiveRequestsPerUser : currentLimit));
trySubmit(userRequest);
} finally {
lock.unlock();
}
} else {
// add request to waiting queue and let FairnessManager handle it
waitingQueue.add(userRequest);
}
}
private void trySubmit(UserRequest userRequest) {
// directly submit to thread pool executor if less load overall and per user
AtomicInteger counter = currentActiveRequestsPerUser.get(userRequest.getUserId());
if (currentTotalActiveRequests.get() < maxActiveRequests && (counter == null || counter.get() < currentLimitPerUser.get())) {
currentTotalActiveRequests.incrementAndGet();
if (counter == null) {
currentActiveRequestsPerUser.put(userRequest.getUserId(), (counter = new AtomicInteger(0)));
}
counter.incrementAndGet();
threadPoolExecutor.submit(userRequest);
} else {
// add request to waiting queue and let FairnessManager handle it
waitingQueue.add(userRequest);
}
}
private class UserRequestComparator implements Comparator<UserRequest> {
@Override
public int compare(UserRequest o1, UserRequest o2) {
AtomicInteger count1 = currentActiveRequestsPerUser.get(o1.getUserId());
AtomicInteger count2 = currentActiveRequestsPerUser.get(o2.getUserId());
if (count1 == null) { // this means no current requests by this user
return -1;
} else if (count2 == null) { // this means no current requests by this user
return 1;
} else {
return count1.get() <= count2.get() ? -1 : 1; // user with less current requests goes up in the queue
}
}
}
private class FairnessManager implements Runnable {
public void run() {
if (!waitingQueue.isEmpty() && currentTotalActiveRequests.get() < maxActiveRequests) {
lock.lock();
try {
int maxIterations = 5; // just to avoid endless attempts
UserRequest userRequest;
while (maxIterations-- > 0 && (userRequest = waitingQueue.poll()) != null) {
trySubmit(userRequest);
}
} finally {
lock.unlock();
}
}
}
}
private class LocalThreadPoolExecutor extends ThreadPoolExecutor {
public LocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (r instanceof UserRequest) {
currentTotalActiveRequests.decrementAndGet();
int userId = ((UserRequest) r).getUserId();
lock.lock();
try {
int count = currentActiveRequestsPerUser.get(userId).decrementAndGet();
if (count == 0) {
currentActiveRequestsPerUser.remove(userId);
}
} finally {
lock.unlock();
}
}
}
}
}
用户请求通过submitUserRequest(UserRequest userRequest)
提交给服务器,它要么直接将其提交给线程池管理器,要么在服务器中来自同一用户或总体的请求数量更多时让请求等待。您需要定义每个用户的最小和最大请求数,然后服务器根据当前负载动态调整每个用户的限制。
服务器有一个清除等待队列的内部线程。
我认为最好的办法是使用 BlockingPriorityQueue 作为 ThreadPoolExecutor 的工作队列。然后用一个对象包装每个任务,该对象知道用户尝试加载的请求数量,这样当该数量小于其他数量时,它应该提高优先级
public class PriorityTask implements Comparable<Task>, Runnable {
private final Runnable task;
private final int request;
public PriorityTask(Runnable task, int request) { .. }
public void run(){
task.run();
}
public int compareTo(Task task){
return this.request > task.request ? 1 : -1;
}
}
因此高负载用户将退避低负载用户。这显然会导致高负载用户饥饿。为避免饥饿,您还可以添加一个 Date
字段,该字段表示,如果此任务在 10 秒内未执行(作为我选择的随机时间),尽管是高负载用户仍执行它。
这不一定是完美的,但可以帮助您朝着更好的方向发展。