在 FixedThreadPool 中处理背压

Handle back-pressure in FixedThreadPool

如何使用线程池处理 Java 中的背压?

如何拒绝新任务,使提交的任务不超过 N 个。 N - 是提交队列中允许的最大任务数,其中包括新的、运行ning、暂停(未完成)的任务。

用例

用户提交运行一段时间的计算任务。有时,有很多用户同时提交任务。如果已经提交了 N 个任务,如何拒绝新任务。

也就是说,提交的(未完成、开始或未开始)任务总数不能大于N

示例代码

Here is full version 以下是简短的片段。

一项漫长的 运行ning 任务。 计算任务.

public class CalculationTask {
    public CalculationTask(final String name) {
        this.name = name;
    }

    public CalculationResult calculate() {
        final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS);
        sleep(waitTimeMs);
        final int result = Math.abs(RANDOM.nextInt());
        final String text = "This is result: " + result;
        final CalculationResult calculationResult = new CalculationResult(name, text, result);
        System.out.println("Calculation finished: " + calculationResult);
        return calculationResult;
    }
}

它的结果。 计算结果.

public class CalculationResult {

    private final String taskName;
    private final String text;
    private final Integer number;
    // Getters, setters, constructor, toString.
}

这就是我提交工作的方式。 CalculationBroker.

public class CalculationBroker {

    private static final int MAX_WORKERS_NUMBER = 5;

    private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER);
    private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>();

    public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) {
        final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName());
        if (calculationResultCached != null) {
            return CompletableFuture.completedFuture(calculationResultCached);
        }

        System.out.println("Calculation submitted: " + calculationTask.getName());

        final CompletableFuture<CalculationResult> calculated = CompletableFuture
                .supplyAsync(calculationTask::calculate, executorService);
        calculated.thenAccept(this::updateCache);
        return calculated;
    }

    private void updateCache(final CalculationResult calculationResult) {
        calculationCache.put(calculationResult.getTaskName(), calculationResult);
    }
}

这就是我 运行 他们在一起的方式。 主要.

public class Main {

    public static void main(String[] args) {
        final int N_TASKS = 100;
        final CalculationBroker calculationBroker = new CalculationBroker();
        final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>();
        for (int i = 0; i < N_TASKS; i++) {
            final CalculationTask calculationTask = createCalculationTask(i);
            final CompletableFuture<CalculationResult> calculationResultCompletableFuture =
                    calculationBroker.submit(calculationTask);
            completableFutures.add(calculationResultCompletableFuture);
        }

        calculationBroker.close();
    }

    private static CalculationTask createCalculationTask(final int counter) {
        return new CalculationTask("CalculationTask_" + counter);
    }
}

这是输出。

2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97.
2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98.
2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99.
2020-05-23 14:14:54 [pool-1-thread-3] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066}
2020-05-23 14:14:55 [pool-1-thread-1] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885}
2020-05-23 14:14:56 [pool-1-thread-5] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120}
20

我的发现。

Bellow details

Code above is equivalent to Executors.newFixedThreadPool(n), however instead of default unlimited LinkedBlockingQueue we use ArrayBlockingQueue with fixed capacity of 100. This means that if 100 tasks are already queued (and n being executed), new task will be rejected with RejectedExecutionException.

ThreadPoolExecutor使用一个LinkedBlockingQueue,默认是无限制的

如上面的 post 所示:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);

你回答了你自己的问题......你可以使用 Queue 大小来做到这一点..

int poolSize = ...;
int queueSize = ...;
CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler();

ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(queueSize),
    handler);

您可以使用 CustomRejectedExecutionHandler 来处理被拒绝的话题。

import java.util.concurrent.ThreadPoolExecutor;

import org.apache.log4j.Logger;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class);

    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
        LOGGER.error(runnable.toString() + " execution rejected.");
    }
}

谢谢Hussein and also this answer and documentation。 就这样搞定了。

你可以check full sources.

    private final ExecutorService executorService = initializeThreadPoolWithRejection();

    private ExecutorService initializeThreadPoolWithRejection() {
        final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

        return new ThreadPoolExecutor(WORKERS_NUMBER, MAX_WORKERS_NUMBER,
                0L, TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(10 /*queueSize*/),
                handler);
    }

注意,我使用 ThreadPoolExecutor.AbortPolicy(); 因为它默认失败并出现 ExecutionException 异常。

计算中介

    public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) 
    {
        final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName());
        if (calculationResultCached != null) {
            return CompletableFuture.completedFuture(calculationResultCached);
        }

        LOGGER.info("Calculation submitted: {}.", calculationTask.getName());

        try {
            final CompletableFuture<CalculationResult> calculated = CompletableFuture
                    .supplyAsync(calculationTask::calculate, executorService);
            calculated.thenAccept(this::updateCache);
            return calculated;
        } catch (Exception e) {
            System.out.println("Failed to submit a task.");
            return CompletableFuture.failedFuture(e);
        }
    }

Main中的用法示例:

    private static void completeFuture(final CompletableFuture<CalculationResult> future) {
        final CalculationResult calculationResult;
        try {
            calculationResult = future.get();
            System.out.println("Task is finished: " + calculationResult);
        } catch (InterruptedException e) {
            System.out.println("Task was interrupted. " + e.getMessage());
        } catch (ExecutionException e) {
            System.out.println("Task failed.");
        }
    }

它产生输出:

2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_15.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_16.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_17.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_20.
Failed to submit a task.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_21.
Failed to submit a task.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_22.
Failed to submit a task.
2020-05-23 16:44:11 [pool-1-thread-8] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_17', text='This is result: 1096770940', number=1096770940, durationMs=1246}
2020-05-23 16:44:11 [pool-1-thread-4] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_3', text='This is result: 2103177010', number=2103177010, durationMs=1814}
2020-05-23 16:44:12 [pool-1-thread-6] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_15', text='This is result: 961885863', number=961885863, durationMs=2632}
2
Task is finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 79356259', number=79356259, durationMs=3875}
Task is finished: CalculationResult{taskName='CalculationTask_1', text='This is result: 532289460', number=532289460, durationMs=3725}
Task is finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1579151336', number=1579151336, durationMs=3684}
Task failed.
Task failed.
Task failed.

注意,它仅适用于 Java 9+。

CompletableFuture.failedFuture(e); 不适用于 Java 8.