当另一个 Sink 接收时发出的 Akka Source

Akka Source that emits when another Sink receives

我有一个源 a 将值发送到接收器 b

现在我想要另一个源 c 发出一个值,每次 b 收到一个事件。

我的想法是使用另一个可用作通知程序的接收器 d,但随后我需要从接收器创建源的功能。

a.to(b).alsoTo(d)

类似

Source.from(d)

另一种描述方式是,您希望 a 发出的每个事件都发送到 bc。这就是 BroadcastHub 所做的;它可用于允许来自一个 Source 的事件被多个 Sinks.

使用

如果您将 Source 连接到 BroadcastHub.sink,然后将其具体化,您将获得一个新的 Source。然后可以将此 Source 附加到 2 个或更多 Sink,每个 Sink 将获得原始 Source 发送的消息的副本。

例如,我将它与 Akka 一起使用,让一个 Actor 向多个客户端广播消息(对于 gRPC 事件):

val (actorRef: ActorRef[Event], eventSource: Source[Event, akka.NotUsed]) =
  ActorSource.actorRef[Event](
    completionMatcher = PartialFunction.empty,
    failureMatcher = PartialFunction.empty,
    16,
    OverflowStrategy.fail
  )
    .toMat(BroadcastHub.sink)(Keep.both)
    .run()

这会创建 eventSource,它可以在管道中使用并多次具体化以创建多个流。每次向 actorRef 发送消息时,从 eventSource 实现的每个流都会收到该消息。

有关详细信息,请参阅 the documentation