如何从 Akka 事件流构建 Akka 流源?
How to build an Akka Streams Source from the Akka Event Stream?
当 MyActor
收到 Start
消息时,它会运行 Akka Stream
,并将收到的每个项目发布到 Akka Event Stream
。
class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {
override def receive: Receive = {
case Start =>
someSource
.toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
.run()
}
}
现在在另一个代码块中,我想从该事件流中的那些项目构建一个 Source
,这样每个发布的项目都可以在另一个 Akka Stream
.
中处理
我该怎么做?
以防万一它可能会添加更多选项,请注意其他有问题的代码块是 Play framework
的 Websocket 处理程序。
这似乎是 XY problem。如果发布者和订阅者最终解耦,如果发布者产生数据的速度比订阅者快,会发生什么?
话虽如此,这里有一种方法可以满足您的要求:
/** Produce a source by subscribing to the Akka actorsystem event bus for a
* specific event type.
*
* @param bufferSize max number of events to buffer up in the source
* @param overflowStrategy what to do if the event buffer fills up
*/
def itemSource[Item : ClassTag](
bufferSize: Int = 1000,
overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
implicit system: ActorSystem
): Source[Item, NotUsed] = Source
.lazily { () =>
val (actorRef, itemSource) = Source
.actorRef[Item](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize,
overflowStrategy
)
.preMaterialize()
system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)
itemSource
}
.mapMaterializedValue(_ => NotUsed)
我终于让它与 BroadcastHub 一起工作,不再有 Akka 事件流了。
我的发布者(它本身正在使用源)看起来像这样:
val publisherSource = someSource
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
.run()
然后在另一个代码块中,我只需要引用 publisherSource:
val subscriberSource = publisherSource
.map(...) // Whatever
您可以拥有任意数量的 subscriberSource
,他们都会收到相同的物品。
当 MyActor
收到 Start
消息时,它会运行 Akka Stream
,并将收到的每个项目发布到 Akka Event Stream
。
class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {
override def receive: Receive = {
case Start =>
someSource
.toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
.run()
}
}
现在在另一个代码块中,我想从该事件流中的那些项目构建一个 Source
,这样每个发布的项目都可以在另一个 Akka Stream
.
我该怎么做?
以防万一它可能会添加更多选项,请注意其他有问题的代码块是 Play framework
的 Websocket 处理程序。
这似乎是 XY problem。如果发布者和订阅者最终解耦,如果发布者产生数据的速度比订阅者快,会发生什么?
话虽如此,这里有一种方法可以满足您的要求:
/** Produce a source by subscribing to the Akka actorsystem event bus for a
* specific event type.
*
* @param bufferSize max number of events to buffer up in the source
* @param overflowStrategy what to do if the event buffer fills up
*/
def itemSource[Item : ClassTag](
bufferSize: Int = 1000,
overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
implicit system: ActorSystem
): Source[Item, NotUsed] = Source
.lazily { () =>
val (actorRef, itemSource) = Source
.actorRef[Item](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize,
overflowStrategy
)
.preMaterialize()
system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)
itemSource
}
.mapMaterializedValue(_ => NotUsed)
我终于让它与 BroadcastHub 一起工作,不再有 Akka 事件流了。
我的发布者(它本身正在使用源)看起来像这样:
val publisherSource = someSource
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
.run()
然后在另一个代码块中,我只需要引用 publisherSource:
val subscriberSource = publisherSource
.map(...) // Whatever
您可以拥有任意数量的 subscriberSource
,他们都会收到相同的物品。