Akka Stream - 将流拆分为多个源
Akka Stream - Splitting flow into multiple Sources
我在 Akka Stream 中有一个以 Sink 结束的 TCP 连接。现在所有消息都进入一个接收器。给定一些功能,我想将流分成未知数量的接收器。
用例如下,从TCP连接我得到了类似List[DeltaValue]
的连续流,现在我想为每个DeltaValue.id
创建一个actorSink,这样我就可以不断积累并为每个 DeltaValue.id
实施行为。我发现这是流处理中的标准用例,但我找不到 Akka Stream 的好例子。
这是我现在拥有的:
def connect(): ActorRef = tcpConnection
.//SOMEHOW SPLIT HERE and create a ReceiverActor for each message
.to(Sink.actorRef(system.actorOf(ReceiverActor.props(), ReceiverActor.name), akka.Done))
.run()
更新:
我现在有了这个,不知道该说些什么,感觉不是很稳定,但应该可以用:
private def spawnActorOrSendMessage(m: ResponseMessage): Unit = {
implicit val timeout = Timeout(FiniteDuration(1, TimeUnit.SECONDS))
system.actorSelection("user/" + m.id.toString).resolveOne().onComplete {
case Success(actorRef) => actorRef ! m
case Failure(ex) => (system.actorOf(ReceiverActor.props(), m.id.toString)) ! m
}
}
def connect(): ActorRef = tcpConnection
.to(Sink.foreachParallel(10)(spawnActorOrSendMessage))
.run()
EventStream
您可以在流接收器中向 ActorSystem 的 EventStream
发送消息,并单独让 Actors 订阅流。
在流级别拆分
您可以使用 Broadcast. The documentation 在流级别拆分流,这是一个很好的例子。
在 Actor 级别拆分
您还可以使用 Sink.actorRef
in combination with a BroadcastPool
向多个 Actor 广播消息。
下面应该是问题中更新内容的改进版本。主要改进是您的 actor 保存在数据结构中,以避免 actorSelection
对每个传入消息进行解析。
case class DeltaValue(id: String, value: Double)
val src: Source[DeltaValue, NotUsed] = ???
src.runFold(Map[String, ActorRef]()){
case (actors, elem) if actors.contains(elem.id) ⇒
actors(elem.id) ! elem.value
actors
case (actors, elem) ⇒
val newActor = system.actorOf(ReceiverActor.props(), ReceiverActor.name)
newActor ! elem.value
actors.updated(elem.id, newActor)
}
请记住,当您将 Akka Streams 与裸 actor 集成时,您将失去背压支持。这就是为什么您应该尽可能在 Akka Streams 的边界内尝试和实现您的逻辑的原因之一。这并不总是可能的 - 例如当需要远程处理等时
对于您的情况,您可以考虑利用 groupBy
和 substream 的概念。下面的例子是通过对每个子流的元素求和来折叠它们,只是为了给出一个想法:
src.groupBy(maxSubstreams = Int.MaxValue, f = _.id)
.fold("" → 0d) {
case ((id, acc), delta) ⇒ id → delta.value + acc
}
.mergeSubstreams
.runForeach(println)
我在 Akka Stream 中有一个以 Sink 结束的 TCP 连接。现在所有消息都进入一个接收器。给定一些功能,我想将流分成未知数量的接收器。
用例如下,从TCP连接我得到了类似List[DeltaValue]
的连续流,现在我想为每个DeltaValue.id
创建一个actorSink,这样我就可以不断积累并为每个 DeltaValue.id
实施行为。我发现这是流处理中的标准用例,但我找不到 Akka Stream 的好例子。
这是我现在拥有的:
def connect(): ActorRef = tcpConnection
.//SOMEHOW SPLIT HERE and create a ReceiverActor for each message
.to(Sink.actorRef(system.actorOf(ReceiverActor.props(), ReceiverActor.name), akka.Done))
.run()
更新: 我现在有了这个,不知道该说些什么,感觉不是很稳定,但应该可以用:
private def spawnActorOrSendMessage(m: ResponseMessage): Unit = {
implicit val timeout = Timeout(FiniteDuration(1, TimeUnit.SECONDS))
system.actorSelection("user/" + m.id.toString).resolveOne().onComplete {
case Success(actorRef) => actorRef ! m
case Failure(ex) => (system.actorOf(ReceiverActor.props(), m.id.toString)) ! m
}
}
def connect(): ActorRef = tcpConnection
.to(Sink.foreachParallel(10)(spawnActorOrSendMessage))
.run()
EventStream
您可以在流接收器中向 ActorSystem 的 EventStream
发送消息,并单独让 Actors 订阅流。
在流级别拆分
您可以使用 Broadcast. The documentation 在流级别拆分流,这是一个很好的例子。
在 Actor 级别拆分
您还可以使用 Sink.actorRef
in combination with a BroadcastPool
向多个 Actor 广播消息。
下面应该是问题中更新内容的改进版本。主要改进是您的 actor 保存在数据结构中,以避免 actorSelection
对每个传入消息进行解析。
case class DeltaValue(id: String, value: Double)
val src: Source[DeltaValue, NotUsed] = ???
src.runFold(Map[String, ActorRef]()){
case (actors, elem) if actors.contains(elem.id) ⇒
actors(elem.id) ! elem.value
actors
case (actors, elem) ⇒
val newActor = system.actorOf(ReceiverActor.props(), ReceiverActor.name)
newActor ! elem.value
actors.updated(elem.id, newActor)
}
请记住,当您将 Akka Streams 与裸 actor 集成时,您将失去背压支持。这就是为什么您应该尽可能在 Akka Streams 的边界内尝试和实现您的逻辑的原因之一。这并不总是可能的 - 例如当需要远程处理等时
对于您的情况,您可以考虑利用 groupBy
和 substream 的概念。下面的例子是通过对每个子流的元素求和来折叠它们,只是为了给出一个想法:
src.groupBy(maxSubstreams = Int.MaxValue, f = _.id)
.fold("" → 0d) {
case ((id, acc), delta) ⇒ id → delta.value + acc
}
.mergeSubstreams
.runForeach(println)