当另一个 Sink 接收时发出的 Akka Source
Akka Source that emits when another Sink receives
我有一个源 a
将值发送到接收器 b
。
现在我想要另一个源 c
发出一个值,每次 b 收到一个事件。
我的想法是使用另一个可用作通知程序的接收器 d
,但随后我需要从接收器创建源的功能。
a.to(b).alsoTo(d)
类似
Source.from(d)
另一种描述方式是,您希望 a
发出的每个事件都发送到 b
和 c
。这就是 BroadcastHub
所做的;它可用于允许来自一个 Source
的事件被多个 Sinks
.
使用
如果您将 Source
连接到 BroadcastHub.sink
,然后将其具体化,您将获得一个新的 Source
。然后可以将此 Source
附加到 2 个或更多 Sink
,每个 Sink
将获得原始 Source
发送的消息的副本。
例如,我将它与 Akka 一起使用,让一个 Actor 向多个客户端广播消息(对于 gRPC 事件):
val (actorRef: ActorRef[Event], eventSource: Source[Event, akka.NotUsed]) =
ActorSource.actorRef[Event](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
16,
OverflowStrategy.fail
)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
这会创建 eventSource
,它可以在管道中使用并多次具体化以创建多个流。每次向 actorRef
发送消息时,从 eventSource
实现的每个流都会收到该消息。
有关详细信息,请参阅 the documentation。
我有一个源 a
将值发送到接收器 b
。
现在我想要另一个源 c
发出一个值,每次 b 收到一个事件。
我的想法是使用另一个可用作通知程序的接收器 d
,但随后我需要从接收器创建源的功能。
a.to(b).alsoTo(d)
类似
Source.from(d)
另一种描述方式是,您希望 a
发出的每个事件都发送到 b
和 c
。这就是 BroadcastHub
所做的;它可用于允许来自一个 Source
的事件被多个 Sinks
.
如果您将 Source
连接到 BroadcastHub.sink
,然后将其具体化,您将获得一个新的 Source
。然后可以将此 Source
附加到 2 个或更多 Sink
,每个 Sink
将获得原始 Source
发送的消息的副本。
例如,我将它与 Akka 一起使用,让一个 Actor 向多个客户端广播消息(对于 gRPC 事件):
val (actorRef: ActorRef[Event], eventSource: Source[Event, akka.NotUsed]) =
ActorSource.actorRef[Event](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
16,
OverflowStrategy.fail
)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
这会创建 eventSource
,它可以在管道中使用并多次具体化以创建多个流。每次向 actorRef
发送消息时,从 eventSource
实现的每个流都会收到该消息。
有关详细信息,请参阅 the documentation。