Project Reactor 中的 Akka 流 .via() 等价物是什么?
What is the Akka streams .via() equivalent in Project Reactor?
我对 Project Reactor 文档有点头疼。我在 Akka Streams 方面有一些经验,但现在我正在从事一个使用 Project Reactor 的项目。
我需要一个能够接受序列以传递消息的 Reactor 运算符。它的行为需要类似于 Akka Streams 中的 .via() 运算符。
例如,假设我们有一个序列:A -> B -> C,我需要在 B 步骤之后注入序列 X1 -> X2 -> X3。所以最后的顺序是 A -> B -> X1 -> X2 -> X3 -> C.
Reactor 中有这样的东西吗?
Project Reactor 中接近 via
的是 transform 方法。
所以在 Akka 中假设你有这个图表:
Source.single(10)
.map(_ * -1) //some mapping
.runWith(Sink.ignore)
然后你有这个流程:
val flow = Flow[Int].map(_ * 2)
您可以像这样将该流插入到您的图形中:
Source.single(10)
.map(_ * -1)
.via(flow)
.runWith(Sink.ignore)
Project Reactor 中的等价物是这样的:
有图表:
Flux.just(10)
.map(x -> x * -1)
.subscribe();
以及将 Flux<Integer>
转换为 Publisher<Integer>
的方法:
public static class Transformers
{
public static Publisher<Integer> flow(Flux<Integer> f)
{
return f.map(x -> x * 2);
}
}
您可以像这样将该方法插入到您的图形中:
Flux.just(10)
.map(x -> x * -1)
.transform(Transformers::flow)
.subscribe();
我写了一个 post 关于这两个 API 之间的区别和其他区别的文章,也许您会发现它很有用。这个 post 来自 2019 年,API 不断发展。例如,我在 Flux
的上下文中提到了 compose
方法,自从我写这篇文章以来,它已重命名为 transformDeferred
,我不确定自从我编写 post 之后还有什么变化,所以请注意:Akka Streams vs Project Reactor API
我对 Project Reactor 文档有点头疼。我在 Akka Streams 方面有一些经验,但现在我正在从事一个使用 Project Reactor 的项目。
我需要一个能够接受序列以传递消息的 Reactor 运算符。它的行为需要类似于 Akka Streams 中的 .via() 运算符。
例如,假设我们有一个序列:A -> B -> C,我需要在 B 步骤之后注入序列 X1 -> X2 -> X3。所以最后的顺序是 A -> B -> X1 -> X2 -> X3 -> C.
Reactor 中有这样的东西吗?
Project Reactor 中接近 via
的是 transform 方法。
所以在 Akka 中假设你有这个图表:
Source.single(10)
.map(_ * -1) //some mapping
.runWith(Sink.ignore)
然后你有这个流程:
val flow = Flow[Int].map(_ * 2)
您可以像这样将该流插入到您的图形中:
Source.single(10)
.map(_ * -1)
.via(flow)
.runWith(Sink.ignore)
Project Reactor 中的等价物是这样的:
有图表:
Flux.just(10)
.map(x -> x * -1)
.subscribe();
以及将 Flux<Integer>
转换为 Publisher<Integer>
的方法:
public static class Transformers
{
public static Publisher<Integer> flow(Flux<Integer> f)
{
return f.map(x -> x * 2);
}
}
您可以像这样将该方法插入到您的图形中:
Flux.just(10)
.map(x -> x * -1)
.transform(Transformers::flow)
.subscribe();
我写了一个 post 关于这两个 API 之间的区别和其他区别的文章,也许您会发现它很有用。这个 post 来自 2019 年,API 不断发展。例如,我在 Flux
的上下文中提到了 compose
方法,自从我写这篇文章以来,它已重命名为 transformDeferred
,我不确定自从我编写 post 之后还有什么变化,所以请注意:Akka Streams vs Project Reactor API