Akka Stream:在图形构建器阶段动态增长网点

Akka Stream : dynamically grow outlets during graph builder phase

我了解到在 Akka Stream 中,一个出口必须连接到一个入口,并且不自动支持将多个接收器连接到同一源。所以你必须插入中间对象,例如 Broadcast.

我正在将信号处理 DAG 转换为 Akka 流图,如果我可以在遍历过程中发现源时动态地将汇添加到源,那将对我有很大帮助。如果我有自定义 GraphStage,我可以拥有自己的 Shape,其 outlets 集合在 Graph.create 阶段动态增长吗?此调用支持正常的 DSL 操作 ~>

b.addEdge(importAndGetPort(b), to)

建造者如何 "get" 这里的 Outlet 我是否能够按需增加形状?


如果这不起作用,是否可以 "eject" 以前的广播,断开其边缘并在图形构建期间用新的更大的广播将它们连接起来?

GraphDSL 不允许动态改变你的形状。

但是,从 Akka 2.4.10 开始,您可以使用 BroadcastHub(和 MergeHub)。

BroadcastHub 可以为您提供一个具体化为源的接收器。 此 Source 可以根据需要多次具体化以动态附加多个订阅者。

所以对于你的 DAG 的一个节点(例如 indegree=1 和 outdegree=3),你可以有类似

val hubSource = inEdgeSource.toMat(BroadcastHub.sink(bufferSize = ...))(Keep.right).run()

val nodeSink1 = hubSource.to(outEdgeSink1).run()
val nodeSink2 = hubSource.to(outEdgeSink2).run()
val nodeSink3 = hubSource.to(outEdgeSink3).run()

Akka 文档:

http://doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Dynamic_fan-in_and_fan-out_with_MergeHub_and_BroadcastHub