Quasar Fiber 相当于 Java 的 ThreadPoolExecutor?

Quasar Fiber equivalent of Java's ThreadPoolExecutor?

我一直很好奇 Quasar 及其轻量级纤维作为线程的替代品。在咨询他们 API docs, I have not been able to figure out how to go about converting a typical ThreadPoolExecutor 后进入一个纤维池。

int maxThreadPoolSize = 10;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        maxThreadPoolSize,
        maxThreadPoolSize,
        10, TimeUnit.MINUTES,
        new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy()
);

for (int i = 0; i < 100; i++) {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // run some code
        }
    });
}

上面的代码创建了一个有 10 个线程的池,池前面的一个队列可以容纳 10 个元素和一个拒绝策略(当队列满时)让主线程自己执行一个 Runnable 任务。当 for 循环创建 100 个 运行nables 时,它们将在池中一次执行 10 个,10 个排队,主线程自己选择一个 Runnable 直到其他人完成,然后主线程返回添加执行者的可运行对象。

你会如何使用 Quasar 的纤维来做到这一点?它一开始就是这样使用的吗?


编辑:我原来的问题措辞不当。本质上,我试图找到一种机制来限制有多少光纤可以同时 运行。例如,如果已经有 200 个纤程 运行ning,就不要再启动纤程。如果光纤的最大数量为 运行ning,请等待一个完成后再启动一个新光纤。

Fiber 非常便宜,因此您根本不需要池(及其异步作业调度模型):只需启动一个 fiber 并让它 运行 每次您需要新的时都使用常规顺序代码顺序过程 运行 与其他过程同时进行。

java.util.concurrent.Semaphore 在我的特定设置中运行良好。

我的解决方案的一般要点:

  • 创建具有所需最大许可数的信号量(又名最大并发纤程)
  • 主线程负责从队列中挑选要处理的任务
  • 主线程调用 semaphore.acquire():
    • 如果许可可用,则启动新的 Fiber 来处理任务
    • 如果获得所有许可,则信号量将阻塞主线程并等待许可可用
  • Fiber 启动后,主线程重复其逻辑。从队列中选取一个新任务并尝试启动一个新的 Fiber。

好处:标准​​Java的信号量是固定的,不能动态调整许可数。为了使其动态化,这个 link 派上用场了:http://blog.teamlazerbeez.com/2009/04/20/javas-semaphore-resizing/

我们刚刚预发布了 kilim 2.0。它提供了一个 fiber 和 actor 实现(类似于类星体)并由 ThreadPoolExecutor

提供支持

限制并发任务数量的最有效方法是让一个任务充当控制器并收听邮箱(我认为 quasar 调用这些通道)并维护 运行 个任务的数量.每个任务完成后,给邮箱留言

一般来说,使用比核心更多的线程是没有意义的

每个纤程都由一个 FiberScheduler 调度,当你创建一个没有调度器的纤程时,一个 FiberForkJoinScheduler 将被创建并分配给这个纤程。

简而言之,如果您想在线程池中管理您的纤程,请使用 FiberExecutorScheduler: Quasar's document about scheduling fibers
你的代码可能是这样的

    int maxThreadPoolSize = 10;
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            maxThreadPoolSize,
            maxThreadPoolSize,
            10, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    FiberExecutorScheduler scheduler = new FiberExecutorScheduler("FibersInAPool", executor);
    for (int i = 0; i < 100; i++) {
        Fiber fiber = new Fiber<Void>(scheduler
                , new SuspendableCallable<Void>() {
            @Override
            public Void run() throws SuspendExecution, InterruptedException {
                // run some code
                return null;
            }
        });
        fiber.start();
    }