Project Reactor 中的 Akka 流 .via() 等价物是什么?

What is the Akka streams .via() equivalent in Project Reactor?

我对 Project Reactor 文档有点头疼。我在 Akka Streams 方面有一些经验,但现在我正在从事一个使用 Project Reactor 的项目。

我需要一个能够接受序列以传递消息的 Reactor 运算符。它的行为需要类似于 Akka Streams 中的 .via() 运算符。

例如,假设我们有一个序列:A -> B -> C,我需要在 B 步骤之后注入序列 X1 -> X2 -> X3。所以最后的顺序是 A -> B -> X1 -> X2 -> X3 -> C.

Reactor 中有这样的东西吗?

Project Reactor 中接近 via 的是 transform 方法。

所以在 Akka 中假设你有这个图表:

Source.single(10)
      .map(_ * -1) //some mapping
      .runWith(Sink.ignore)

然后你有这个流程:

val flow = Flow[Int].map(_ * 2)

您可以像这样将该流插入到您的图形中:

Source.single(10)
      .map(_ * -1)
      .via(flow)
      .runWith(Sink.ignore)

Project Reactor 中的等价物是这样的:

有图表:

 Flux.just(10)
     .map(x -> x * -1)
     .subscribe();

以及将 Flux<Integer> 转换为 Publisher<Integer> 的方法:

public static class Transformers
{
  public static Publisher<Integer> flow(Flux<Integer> f)
  {
    return f.map(x -> x * 2);
  }
}

您可以像这样将该方法插入到您的图形中:

Flux.just(10)
    .map(x -> x * -1)
    .transform(Transformers::flow)
    .subscribe();

我写了一个 post 关于这两个 API 之间的区别和其他区别的文章,也许您会发现它很有用。这个 post 来自 2019 年,API 不断发展。例如,我在 Flux 的上下文中提到了 compose 方法,自从我写这篇文章以来,它已重命名为 transformDeferred,我不确定自从我编写 post 之后还有什么变化,所以请注意:Akka Streams vs Project Reactor API