如何在多线程中为用户提供公平性

How to provide fairness to users in multithreading

在我们的应用程序中,我们有一个大小为 50 的线程池。多个线程登录到应用程序并为它们分配线程。一个用户消耗的线程数基本上取决于用户试图加载的数据量。现在的问题是:- 当高流量用户登录时,它开始消耗 40 个线程,导致其他低流量用户等待。我们希望有一种机制,我们可以在其中为用户提供一些公平性,以便一个用户不会消耗所有线程。你能为此提出一些智能解决方案吗?

使用信号量来控制哪些线程必须等待/休眠,哪些可以继续。 登录的用户越多,休眠的用户线程越多。

线程完成后,您可以唤醒休眠的线程。

如果您可以更改服务器设置,允许每个用户有 50 个线程,然后在需要时让它们休眠。这样你就可以让一个用户全速运行,并在公平的基础上放慢他的速度。睡眠意味着线程将暂停并停止使用 CPU 时间直到被唤醒。

您可以使用速率限制器。番石榴有它。 http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/RateLimiter.html

例如,您可以限制用户每秒触发的操作数,并为其他用户留出空间。

我不确定这种情况是否有任何现成的解决方案,但您可以通过以下方式实现(未经测试,如果不能完全奏效,请多多包涵)。

用户请求类似于-

class UserRequest implements Runnable {

    private final int userId;

    public UserRequest(int userId) {
        this.userId = userId;
    }

    public void run() {
        // process the request
    }

    public int getUserId() {
        return userId;
    }
}

现在服务器是这样的-

class FairServer {

    private final int maxActiveRequests;
    private final int maxWaitingRequests;
    private final int minActiveRequestPerUser;
    private final int maxActiveRequestsPerUser;
    private final AtomicInteger currentTotalActiveRequests;
    private final Map<Integer, AtomicInteger> currentActiveRequestsPerUser;
    private final BlockingQueue<UserRequest> waitingQueue;
    private final ThreadPoolExecutor threadPoolExecutor;
    private final ScheduledExecutorService scheduledExecutorService;
    private final Lock lock;
    private AtomicInteger currentLimitPerUser;

    public FairServer(int maxActiveRequests, int maxWaitingRequests, int minActiveRequestPerUser, int maxActiveRequestsPerUser) {
        this.maxActiveRequests = maxActiveRequests;
        this.maxWaitingRequests = maxWaitingRequests;
        this.minActiveRequestPerUser = minActiveRequestPerUser;
        this.maxActiveRequestsPerUser = maxActiveRequestsPerUser;
        this.currentLimitPerUser = new AtomicInteger(0);
        this.currentTotalActiveRequests = new AtomicInteger(0);
        this.currentActiveRequestsPerUser = new HashMap<Integer, AtomicInteger>();
        this.waitingQueue = new PriorityBlockingQueue<UserRequest>(maxWaitingRequests, new UserRequestComparator());
        this.lock = new ReentrantLock();
        this.threadPoolExecutor = new LocalThreadPoolExecutor(0, maxActiveRequests, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.scheduledExecutorService.scheduleWithFixedDelay(new FairnessManager(), 1L, 1L, TimeUnit.SECONDS);
    }

    public void submitUserRequest(UserRequest userRequest) {
        if (waitingQueue.size() >= maxWaitingRequests) {
            throw new RuntimeException("Max limit reached");
        }
        if (currentTotalActiveRequests.get() < maxActiveRequests) {
            lock.lock();
            try {
                int currentLimit = Math.round(maxActiveRequests / (currentActiveRequestsPerUser.size() == 0 ? 1 : currentActiveRequestsPerUser.size()));
                currentLimitPerUser.set(currentLimit < minActiveRequestPerUser ? minActiveRequestPerUser : (currentLimit > maxActiveRequestsPerUser ? maxActiveRequestsPerUser : currentLimit));
                trySubmit(userRequest);
            } finally {
                lock.unlock();
            }
        } else {
            // add request to waiting queue and let FairnessManager handle it
            waitingQueue.add(userRequest);
        }

    }

    private void trySubmit(UserRequest userRequest) {
        // directly submit to thread pool executor if less load overall and per user
        AtomicInteger counter = currentActiveRequestsPerUser.get(userRequest.getUserId());
        if (currentTotalActiveRequests.get() < maxActiveRequests && (counter == null || counter.get() < currentLimitPerUser.get())) {
            currentTotalActiveRequests.incrementAndGet();
            if (counter == null) {
                currentActiveRequestsPerUser.put(userRequest.getUserId(), (counter = new AtomicInteger(0)));
            }
            counter.incrementAndGet();
            threadPoolExecutor.submit(userRequest);
        } else {
            // add request to waiting queue and let FairnessManager handle it
            waitingQueue.add(userRequest);
        }
    }

    private class UserRequestComparator implements Comparator<UserRequest> {
        @Override
        public int compare(UserRequest o1, UserRequest o2) {
            AtomicInteger count1 = currentActiveRequestsPerUser.get(o1.getUserId());
            AtomicInteger count2 = currentActiveRequestsPerUser.get(o2.getUserId());
            if (count1 == null) { // this means no current requests by this user
                return -1;
            } else if (count2 == null) { // this means no current requests by this user
                return 1;
            } else {
                return count1.get() <= count2.get() ? -1 : 1; // user with less current requests goes up in the queue
            }
        }
    }

    private class FairnessManager implements Runnable {
        public void run() {
            if (!waitingQueue.isEmpty() && currentTotalActiveRequests.get() < maxActiveRequests) {
                lock.lock();
                try {
                    int maxIterations = 5; // just to avoid endless attempts
                    UserRequest userRequest;
                    while (maxIterations-- > 0 && (userRequest = waitingQueue.poll()) != null) {
                        trySubmit(userRequest);
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    private class LocalThreadPoolExecutor extends ThreadPoolExecutor {
        public LocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (r instanceof UserRequest) {
                currentTotalActiveRequests.decrementAndGet();
                int userId = ((UserRequest) r).getUserId();
                lock.lock();
                try {
                    int count = currentActiveRequestsPerUser.get(userId).decrementAndGet();
                    if (count == 0) {
                        currentActiveRequestsPerUser.remove(userId);
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

用户请求通过submitUserRequest(UserRequest userRequest)提交给服务器,它要么直接将其提交给线程池管理器,要么在服务器中来自同一用户或总体的请求数量更多时让请求等待。您需要定义每个用户的最小和最大请求​​数,然后服务器根据当前负载动态调整每个用户的限制。 服务器有一个清除等待队列的内部线程。

我认为最好的办法是使用 BlockingPriorityQueue 作为 ThreadPoolExecutor 的工作队列。然后用一个对象包装每个任务,该对象知道用户尝试加载的请求数量,这样当该数量小于其他数量时,它应该提高优先级

public class PriorityTask implements Comparable<Task>, Runnable {
    private final Runnable task;
    private final int request;

    public PriorityTask(Runnable task, int request) { .. }

    public void run(){
       task.run();
    }

    public int compareTo(Task task){
       return this.request > task.request ? 1 : -1;
    }
}

因此高负载用户将退避低负载用户。这显然会导致高负载用户饥饿。为避免饥饿,您还可以添加一个 Date 字段,该字段表示,如果此任务在 10 秒内未执行(作为我选择的随机时间),尽管是高负载用户仍执行它。

这不一定是完美的,但可以帮助您朝着更好的方向发展。