如何从 Akka 事件流构建 Akka 流源?

How to build an Akka Streams Source from the Akka Event Stream?

MyActor 收到 Start 消息时,它会运行 Akka Stream,并将收到的每个项目发布到 Akka Event Stream

class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {

  override def receive: Receive = {
    case Start =>
      someSource
        .toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
        .run()
  }
}

现在在另一个代码块中,我想从该事件流中的那些项目构建一个 Source,这样每个发布的项目都可以在另一个 Akka Stream.

中处理

我该怎么做?

以防万一它可能会添加更多选项,请注意其他有问题的代码块是 Play framework 的 Websocket 处理程序。

这似乎是 XY problem。如果发布者和订阅者最终解耦,如果发布者产生数据的速度比订阅者快,会发生什么?

话虽如此,这里有一种方法可以满足您的要求:

/** Produce a source by subscribing to the Akka actorsystem event bus for a
  * specific event type.
  * 
  * @param bufferSize max number of events to buffer up in the source
  * @param overflowStrategy what to do if the event buffer fills up
  */
def itemSource[Item : ClassTag](
  bufferSize: Int = 1000,
  overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
  implicit system: ActorSystem
): Source[Item, NotUsed] = Source
  .lazily { () =>
    val (actorRef, itemSource) = Source
      .actorRef[Item](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        bufferSize,
        overflowStrategy
      )
      .preMaterialize()

    system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)

    itemSource
  }
  .mapMaterializedValue(_ => NotUsed)

我终于让它与 BroadcastHub 一起工作,不再有 Akka 事件流了。

我的发布者(它本身正在使用源)看起来像这样:

val publisherSource = someSource
  .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
  .run()

然后在另一个代码块中,我只需要引用 publisherSource:

val subscriberSource = publisherSource
  .map(...) // Whatever

您可以拥有任意数量的 subscriberSource,他们都会收到相同的物品。