Akka MergeHub 和 BroadcastHub 通过 Actor 通过 websockets 支持多个客户端

Akka MergeHub and BroadcastHub via Actor to support multiple clients through websockets

目前我使用 websocket

服务器推送keepAlive消息,同时响应客户端的请求。

现在我想让事情变得更有趣,并增加处理 n 客户的可能性。

所以我看了看:

https://github.com/playframework/play-scala-chatroom-example

这基本上是一个 n-Inlet ~> n-Outlet 所以如果 n 个客户端中的任何一个通过他们各自的 websocket 写了一些东西,他们都会收到通知(包括它自己)。

我需要的是更复杂的服务器应该

  1. 仍然向 所有 连接的客户端发送 keepAlive 消息 AND
  2. 如果其中一个客户端请求 something/triggers 服务器端 "event",再次 所有 客户端应该被通知。

所以这基本上只是我抽象思维方式中的一步。

虽然我很天真,但我认为可以通过以下方式完成:

type AllowedWSMessage = String

val myActor = system.actorOf(Props{new myActor}, "myActor")
val myActorSink = Sink.actorRefWithAck(myActor, "init", "acknowledged", "completed")
import scala.concurrent.duration._

val tickingSource: Source[AllowedWSMessage, Cancellable] =
Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
  .map(_ => "Staying Alive")

val serverMessageSource = Source
.queue[AllowedWSMessage](10, OverflowStrategy.backpressure)
.mapMaterializedValue { queue => myActor ! InitTunnel(queue)}

val serverSource: Source[AllowedWSMessage, Cancellable] = tickingSource.merge(serverMessageSource)

private val (clientSink, clientSource) =
{
    // Don't log MergeHub$ProducerFailed as error if the client disconnects.
    // recoverWithRetries -1 is essentially "recoverWith"
    val source = MergeHub.source[AllowedWSMessage]
      .log("source")
      .recoverWithRetries(-1, { case _: Exception ⇒ Source.empty})

    val sink: Sink[AllowedWSMessage, Source[AllowedWSMessage, NotUsed]] = BroadcastHub.sink[AllowedWSMessage]
    source.via(serverSource).toMat(sink)(Keep.both).run()
  }

(注意source.via(serverSource)...)

但当然没那么容易。

最后我想要的基本上是:

(Client -> WebSocket ->) MergeHub ~> myActor ~> BroadcastHub (-> WebSocket -> Client)

现在我想知道,这样做的优雅方式是什么? 或者 MergeHub 和 BroadcastHub 是应对该挑战的错误工具?

你有你的服务器源和接收器,你说它们已经工作了,所以我没有深入研究它们。

val fanIn = MergeHub.source[AllowedWSMessage].to(myActorSink).run()
val fanOut = serverSource.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()

// Now, somewhere in a (route-)method where you handle the websocket connection

Flow.fromSinkAndSource(fanIn, fanOut)

就这么简单,希望你脑子里的结现在解开了:)