我们可以并行处理每个多流水线步骤吗?

Can we get parallel processing of each of Multi pipeline steps?

假设我们有:

现在我们有 3 个步骤。有没有办法让这些步骤 运行 并行?我的意思是一段时间后它应该:抓取 HTML 并同时处理 html + 获取标签内容,同时将数据从已经处理过的项目保存到数据库中。(希望我的意思很明显)这我们可以进行并行处理的方式。默认情况下,我可以看到,mutiny 以串行方式执行。

这是一个例子:

  @Test
  public void test3() {
    Multi<String> source = Multi.createFrom().items("a", "b", "c");
    source
            .onItem().transform(i -> trans(i, "-step1"))
            .onItem().transform(i -> trans(i, "-step2"))
            .onItem().transform(i -> trans(i, "-step3"))
            .subscribe().with(item -> System.out.println("Subscriber received " + item));
  }


  private String trans(String s, String add) {
    int t = new Random().nextInt(4) * 1000;
    try {
      print("Sleeping for '" + s + "' miliseconds: " + t);
      Thread.sleep(t);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    return s + add;
  }

现在报告以下控制台输出:

Sleeping for 'a' miliseconds: 2000
Sleeping for 'a-step1' miliseconds: 3000
Sleeping for 'a-step1-step2' miliseconds: 3000
Subscriber received a-step1-step2-step3
Sleeping for 'b' miliseconds: 0
Sleeping for 'b-step1' miliseconds: 0
Sleeping for 'b-step1-step2' miliseconds: 0
Subscriber received b-step1-step2-step3
Sleeping for 'c' miliseconds: 1000
Sleeping for 'c-step1' miliseconds: 3000
Sleeping for 'c-step1-step2' miliseconds: 3000
Subscriber received c-step1-step2-step3

可以看出它不是运行并发的。我在这里错过了什么?

这是预期的,Multi 将项目作为流处理。

如果您想进行并行操作(例如,启动 10 个 HTTP 请求),您应该合并 Uni,请参阅 https://smallrye.io/smallrye-mutiny/guides/combining-items

如@jponge 所述,您可以在一些 List<Uni<String>> 中收集您的物品 然后调用

Uni.combine().all().unis(listOfUnis).onitem().subscribe().with()

List<Uni<String>> listOfUnis = new ArrayList<>();
    Multi<String> source = Multi.createFrom().items("a", "b", "c");
    source
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step1")))
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step2")))
            .onItem().invoke(i -> listOfUnis.add(trans(i, "-step3")))
// do not subscribe on Multis here

这里还要注意一点 - 如果您要进行 HTTP 调用,最好添加

.emitOn(someBlockingPoolExecutor)

因为您不想阻塞等待 http 调用完成的 Netty 线程