subscribeOn(Schedulers.parallel()) 不工作

subscribeOn(Schedulers.parallel()) is not working

我正在学习 Reactor Core 并遵循这个 https://www.baeldung.com/reactor-core

ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(arrList::add);

System.out.println("After: " + arrList);

当我执行上面的代码行时,给出了。

 Before: []
 [DEBUG] (main) Using Console logging
 After: []

以上代码行应该在另一个线程中开始执行,但它根本不起作用。 有人可以帮我解决这个问题吗?

我认为有些混乱。当您调用 subscribeOn(Schedulers.parallel()) 时。您指定要在不同线程上接收项目。此外,您还必须放慢代码速度,以便订阅 cen 真正启动(这就是我添加 Thread.sleep(100) 的原因)。如果您 运行 我传递的代码有效。你看reactor里面没有神奇的同步机制。

    ArrayList<Integer> arrList = new ArrayList<Integer>();

    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .subscribe(
                    t -> {
                        System.out.println(t + " thread id: " + Thread.currentThread().getId());
                        arrList.add(t);
                    }
            );
    System.out.println("size of arrList(before the wait): " + arrList.size());
    System.out.println("Thread id: "+ Thread.currentThread().getId() + ": id of main thread ");
    Thread.sleep(100);
    System.out.println("size of arrList(after the wait): " + arrList.size());

如果你想将你的项目添加到并行反应器列表中不是一个好的选择。最好在 java 8.

中使用并行流
List<Integer> collect = Stream.of(1, 2, 3, 4)
                .parallel()
                .map(i -> i * 2)
                .collect(Collectors.toList());

您 post 编辑的教程在并发部分不是很准确。作者致谢 he/she 表示以后还会有更多文章。但我认为 post 那个特定示例根本不应该这样做,因为它会造成混淆。我建议不要太相信互联网上的资源:)

如 Reactor 文档中针对各种 subscribe 方法所述:

Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

这意味着到达了 main 方法的末尾,因此主线程在任何线程能够订阅 Reactive 链之前退出,正如 Piotr 所提到的。

您要做的是等到整个链完成后再打印数组的内容。

最简单的做法是:

    ArrayList<Integer> arrList = new ArrayList<>();
    System.out.println("Before: " + arrList);
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(arrList::add)
            .blockLast();

    System.out.println("After: " + arrList);

在这里,您将阻止主线程上的执行,直到处理完 Flux 上的最后一个元素。因此,最后一个 System.out 将不会执行,直到您的 ArrayList 完全填充。

请记住,控制台应用程序中的代码 运行 与 Netty 等服务器环境中的代码方式略有不同。使控制台应用程序等待所有订阅启动的唯一方法是 block.

但是并行线程不允许阻塞。因此,这种方法不适用于 Netty 环境。在明确关闭之前,您的服务器将处于 运行ning 状态,因此 subscribe 就可以了。

但是,在上面的代码片段中,您进行阻塞不仅是为了防止应用程序退出,也是为了等待您读取已填充的数据。

对上述代码的改进如下:

    ArrayList<Integer> arrList = new ArrayList<>();
    System.out.println("Before: " + arrList);
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(arrList::add)
            .doOnComplete(() -> System.out.println("After: " + arrList))
    .blockLast();

即使在这里,doOnComplete 也会从反应链外部访问数据。为防止这种情况,您将在链本身中收集 Flux 的元素,如下所示:

    System.out.println("Before.");
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .collectList()
            .doOnSuccess(list -> System.out.println("After: " + list))
    .block();

同样,请记住当 运行 在 Netty 中(比如 Spring Webflux 应用程序)时,上面的代码将以 subscribe().

结尾

但是请注意,从 Flux 切换到 List(或任何 Collection)意味着您正在从反应式范式切换到命令式编程。您应该能够在 Reactive 范例本身内实现任何功能。