Akka Stream - 将流拆分为多个源

Akka Stream - Splitting flow into multiple Sources

我在 Akka Stream 中有一个以 Sink 结束的 TCP 连接。现在所有消息都进入一个接收器。给定一些功能,我想将流分成未知数量的接收器。

用例如下,从TCP连接我得到了类似List[DeltaValue]的连续流,现在我想为每个DeltaValue.id创建一个actorSink,这样我就可以不断积累并为每个 DeltaValue.id 实施行为。我发现这是流处理中的标准用例,但我找不到 Akka Stream 的好例子。

这是我现在拥有的:

def connect(): ActorRef = tcpConnection
    .//SOMEHOW SPLIT HERE and create a ReceiverActor for each message
    .to(Sink.actorRef(system.actorOf(ReceiverActor.props(), ReceiverActor.name), akka.Done))
    .run()

更新: 我现在有了这个,不知道该说些什么,感觉不是很稳定,但应该可以用:

  private def spawnActorOrSendMessage(m: ResponseMessage): Unit = {
    implicit val timeout = Timeout(FiniteDuration(1, TimeUnit.SECONDS))
    system.actorSelection("user/" + m.id.toString).resolveOne().onComplete {
      case Success(actorRef) => actorRef ! m
      case Failure(ex) => (system.actorOf(ReceiverActor.props(), m.id.toString)) ! m
    }
  }

  def connect(): ActorRef = tcpConnection
    .to(Sink.foreachParallel(10)(spawnActorOrSendMessage))
    .run()

EventStream

您可以在流接收器中向 ActorSystem 的 EventStream 发送消息,并单独让 Actors 订阅流。

在流级别拆分

您可以使用 Broadcast. The documentation 在流级别拆分流,这是一个很好的例子。

在 Actor 级别拆分

您还可以使用 Sink.actorRef in combination with a BroadcastPool 向多个 Actor 广播消息。

下面应该是问题中更新内容的改进版本。主要改进是您的 actor 保存在数据结构中,以避免 actorSelection 对每个传入消息进行解析。

  case class DeltaValue(id: String, value: Double)

  val src: Source[DeltaValue, NotUsed] = ???

  src.runFold(Map[String, ActorRef]()){
    case (actors, elem) if actors.contains(elem.id) ⇒
      actors(elem.id) ! elem.value
      actors
    case (actors, elem) ⇒
      val newActor = system.actorOf(ReceiverActor.props(), ReceiverActor.name)
      newActor ! elem.value
      actors.updated(elem.id, newActor)
  }

请记住,当您将 Akka Streams 与裸 actor 集成时,您将失去背压支持。这就是为什么您应该尽可能在 Akka Streams 的边界内尝试和实现您的逻辑的原因之一。这并不总是可能的 - 例如当需要远程处理等时

对于您的情况,您可以考虑利用 groupBysubstream 的概念。下面的例子是通过对每个子流的元素求和来折叠它们,只是为了给出一个想法:

  src.groupBy(maxSubstreams = Int.MaxValue, f = _.id)
    .fold("" → 0d) {
      case ((id, acc), delta) ⇒ id → delta.value + acc
    }
    .mergeSubstreams
    .runForeach(println)