在 Akka 流中连接两个流
Concatinating two Flows in Akka stream
我正在尝试连接两个流,但我无法解释我的实施输出。
val source = Source(1 to 10)
val sink = Sink.foreach(println)
val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)
val flowGraph = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Int](2))
broadcast ~> flow1 ~> concat.in(0)
broadcast ~> flow2 ~> concat.in(1)
FlowShape(broadcast.in, concat.out)
}
)
source.via(flowGraph).runWith(sink)
我希望这段代码有以下输出。
2
3
4
.
.
.
11
10
20
.
.
.
100
相反,我只看到打印了“2”。你能解释一下我的实现有什么问题吗?我应该如何更改程序以获得所需的输出。
来自 Akka Stream 的 API 文档:
Emits when the current stream has an element available; if the current input completes, it tries the next one
Emits when all of the outputs stops backpressuring and there is an input element available
这两个运算符不会一起工作,因为它们的工作方式存在冲突 -- Concat
尝试在切换到 Broadcast
的输出之一之前提取所有元素另一个,而 Broadcast
不会发射,除非对其所有输出都有需求。
对于您需要的内容,您可以按照评论者的建议使用 concat
进行连接:
source.via(flow1).concat(source.via(flow2)).runWith(sink)
或等效地,使用 Source.combine
如下所示:
Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
使用GraphDSL
,这是Source.combine:
实现的简化版本
val sg = Source.fromGraph(
GraphDSL.create(){ implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
source ~> flow1 ~> concat
source ~> flow2 ~> concat
SourceShape(concat.out)
}
)
sg.runWith(sink)
我正在尝试连接两个流,但我无法解释我的实施输出。
val source = Source(1 to 10)
val sink = Sink.foreach(println)
val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)
val flowGraph = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Int](2))
broadcast ~> flow1 ~> concat.in(0)
broadcast ~> flow2 ~> concat.in(1)
FlowShape(broadcast.in, concat.out)
}
)
source.via(flowGraph).runWith(sink)
我希望这段代码有以下输出。
2
3
4
.
.
.
11
10
20
.
.
.
100
相反,我只看到打印了“2”。你能解释一下我的实现有什么问题吗?我应该如何更改程序以获得所需的输出。
来自 Akka Stream 的 API 文档:
Emits when the current stream has an element available; if the current input completes, it tries the next one
Emits when all of the outputs stops backpressuring and there is an input element available
这两个运算符不会一起工作,因为它们的工作方式存在冲突 -- Concat
尝试在切换到 Broadcast
的输出之一之前提取所有元素另一个,而 Broadcast
不会发射,除非对其所有输出都有需求。
对于您需要的内容,您可以按照评论者的建议使用 concat
进行连接:
source.via(flow1).concat(source.via(flow2)).runWith(sink)
或等效地,使用 Source.combine
如下所示:
Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
使用GraphDSL
,这是Source.combine:
val sg = Source.fromGraph(
GraphDSL.create(){ implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
source ~> flow1 ~> concat
source ~> flow2 ~> concat
SourceShape(concat.out)
}
)
sg.runWith(sink)