反应式 Flux 编程中的并行执行

Parallel execution in reactive Flux programming

我正在尝试避免使用阻塞线程模型并使用反应模型来实现高吞吐量。 我的用例是这样的:有大量传入消息。对于每条消息,我都需要做一些 I/O 而不会阻塞该线程。在这里,我在单独的线程中处理每条消息。如果应用程序被终止,我需要完成正在进行的任务并优雅地关闭。我在下面使用 Thread.sleep 来模拟密集的 I/O 操作。 代码示例如下:

public class TestReactor {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Disposable task = Flux.range(1, 100).parallel().runOn(Schedulers.fromExecutor(executor)).doOnNext(message -> {
            System.out.println(Thread.currentThread().getName()+": processing " + message);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+": Done");
        })
        .sequential()
        .doOnError(e->e.printStackTrace())
        .doOnCancel(()->{
            System.out.println("disposing.....");
            executor.shutdown();
            try {
                executor.awaitTermination(5, TimeUnit.SECONDS);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        })
        .subscribe();

        Thread.sleep(4000);
        
        task.dispose();
        
        System.out.println("Disposed. Waiting for sometime before exit.");
        Thread.sleep(20000);
    }

}

当我 运行 这样做时,通量似乎忽略了 executor.shutdown() 并因中断异常而出错。是否可以使用 flux 实现我的用例?

添加输出:

你犯了一个大错误:在使用响应式编程时,不要使用任何线程操作。这是一个肮脏的黑客。当您尝试在 FRP 上编写代码时,糟糕设计和代码异味的最明显标记是:

  • 在运算符和副作用函数 (doOn) 中调用 try-catch 块。如果您有例外情况 - 好的,没关系。调用 onError 运算符并在其中处理管道的行为。
  • 在 Mono/Flux 中使用 Thread.sleep 和其他并发 API。 Weblfux API 也有很好的 API 来处理并发和多线程。

因此,关于问题及其解决方案:您的设计很糟糕 :) 您不需要处理任何一次性用品或线程中断。你只需要删除它并添加 killswitch。

所以,工作代码,它可以满足您的需求:

public class TestReactor {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        Flux.generate(() -> 0,
                (state, sink) -> {
                    sink.next("current state = " + state);
                    if (state == 100) sink.complete();
                    return state + 1;
                }
        )
        .parallel()
        .runOn(Schedulers.fromExecutor(executor))
        .doOnNext(message -> {
            System.out.println(Thread.currentThread().getName() + ": processing " + message);
            System.out.println(Thread.currentThread().getName() + ": Done");
        })
        .sequential()
        .doOnError(e -> e.printStackTrace())
        .doOnCancel(() -> {
            System.out.println("disposing.....");
        })
        .subscribe();
    }

}