Akka-Http Websockets:如何向消费者发送相同的数据流
Akka-Http Websockets: How to Send consumers the same stream of data
我有一个客户端可以连接到的 WebSocket 我还有一个使用 akka-streams 的数据流。我怎样才能让所有客户获得相同的数据。目前他们似乎在争夺数据。
谢谢
您可以做的一种方法是让一个 actor 扩展 ActorPublisher 并让它订阅
一些消息。
class MyPublisher extends ActorPublisher[MyData]{
override def preStart = {
context.system.eventStream.subscribe(self, classOf[MyData])
}
override def receive: Receive = {
case msg: MyData ⇒
if (isActive && totalDemand > 0) {
// Pushes the message onto the stream
onNext(msg)
}
}
}
object MyPublisher {
def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}
case class MyData(data:String)
然后您可以将该演员用作流的来源:
val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))
然后您可以从该数据源创建流并应用转换以将数据转换为 websocket 消息
val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})
然后您可以在路由处理中使用该流程。
path("readings") {
handleWebsocketMessages(myFlow)
}
从原始流的处理中,您可以将数据发布到事件流,该参与者的任何实例都会将其拾取并放入其 websocket 所服务的流中。
val actorSystem = ActorSystem("foo")
val otherSource = Source.fromIterator(() => List(MyData("a"), MyData("b")).iterator)
otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}
每个套接字都将拥有自己的 actor 实例,以向其提供所有来自单一来源的数据。
我有一个客户端可以连接到的 WebSocket 我还有一个使用 akka-streams 的数据流。我怎样才能让所有客户获得相同的数据。目前他们似乎在争夺数据。
谢谢
您可以做的一种方法是让一个 actor 扩展 ActorPublisher 并让它订阅 一些消息。
class MyPublisher extends ActorPublisher[MyData]{
override def preStart = {
context.system.eventStream.subscribe(self, classOf[MyData])
}
override def receive: Receive = {
case msg: MyData ⇒
if (isActive && totalDemand > 0) {
// Pushes the message onto the stream
onNext(msg)
}
}
}
object MyPublisher {
def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}
case class MyData(data:String)
然后您可以将该演员用作流的来源:
val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))
然后您可以从该数据源创建流并应用转换以将数据转换为 websocket 消息
val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})
然后您可以在路由处理中使用该流程。
path("readings") {
handleWebsocketMessages(myFlow)
}
从原始流的处理中,您可以将数据发布到事件流,该参与者的任何实例都会将其拾取并放入其 websocket 所服务的流中。
val actorSystem = ActorSystem("foo")
val otherSource = Source.fromIterator(() => List(MyData("a"), MyData("b")).iterator)
otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}
每个套接字都将拥有自己的 actor 实例,以向其提供所有来自单一来源的数据。