Play Framework WebSocket Actor 过滤

Play Framework WebSocket Actor Filtering

我目前正在考虑一个实现,通过它我可以从我的 Play 框架 Web 应用程序流式传输一些事件。有一组 IoT 设备可以发出事件和警报。这些设备由它们的 id 标识。我有一个 HTTP 端点,可以通过它获取这些设备的遥测信号。现在我想对警报和事件做同样的事情。所以我开始使用这种简单的方法,首先在我的控制器中定义我的终点,如下所示:

  def events = WebSocket.accept[String, String] { request =>
    ActorFlow.actorRef { out =>
      EventsActor.props(out)
    }
  }

我的活动演员:

class EventsActor(out: ActorRef) extends Actor {

  def receive = {
    case msg: String =>
      out ! ("I received your message: " + msg)
  }
}
object EventsActor {

  def props(out: ActorRef) =
    Props(new EventsActor(out))
}

现在,我没有对我的 EventsActor 做太多事情,但稍后这个 Actor 将获得事件和警报消息被推送到其中,然后将其路由到 WebSocket 端点。

现在我的要求是,在 WebSocket 端点中,当客户端建立连接时,他应该能够为他希望连接的物联网设备指定一个 ID,我应该能够将这个 ID 传递给我可以在其中过滤包含传入 ID 的事件的 EventsActor。

关于如何执行此操作的任何线索?

我做了一个简单的例子来说明你可以如何处理这个问题。虽然还有很多不足之处,但希望对你有所启发!

您实际上想要的是一个 coordinator/router,它可以跟踪哪些 websocket 参与者正在侦听哪些事件类型。您可以将该集线器注入所有已创建的角色,并向其发送事件以将这些 websocket 角色注册到事件集。

object TelemetryHub {
    /** This is the message external sensors could use to stream the data in to the hub **/
    case class FreshData(eventId: UUID, data: Int)
    def props = Props(new TelemetryHub)
}

class TelemetryHub extends Actor {

  type EventID = UUID

  private var subscriptions =
    mutable.Map.empty[EventID, Set[ActorRef]].withDefault(_ => Set())

  override def receive = {
    /** we can use the sender ref to add the requesting actor to the set of subscribers **/
    case SubscribeTo(topic) => subscriptions(topic) = subscriptions(topic) + sender()

    /** Now when the hub receives data, it can send a message to all subscribers
     * of that particular topic
     */
    case FreshData(incomingDataTopicID, data) =>
      subscriptions.find { case (topic, _) => topic == incomingDataTopicID } match {
        case Some((_, subscribers)) => subscribers foreach { _ ! EventData(data) }
        case None => println("This topic was not yet subscribed to")
      }
  }

}

现在我们有了上面的结构,您的 websocket 端点可能如下所示:

object WebsocketClient {

  /**
   * Some messages with which we can subscribe to a topic
   * These messages could be streamed through the websocket from the client
   */
  case class SubscribeTo(eventID: UUID)
  /** This is an example of some data we want to go back to the client. Uses int for simplicity **/
  case class EventData(data: Int)

  def props(out: ActorRef, telemetryHub: ActorRef) = Props(new WebsocketClient(out, telemetryHub))
}

/** Every client will own one of these actors. **/
class WebsocketClient(out: ActorRef, telemetryHub: ActorRef) extends Actor {
  def receive = {
    /** We can send these subscription request to a hub **/
    case subscriptionRequest: SubscribeTo => telemetryHub ! subscriptionRequest
    /** When we get data back, we can send it right to the client **/
    case EventData(data) => out ! data
  }
}

/** We can make a single hub which will be shared between all the connections **/
val telemetryHub = actorSys actorOf TelemetryHub.props

def events = WebSocket.accept[String, String] { _ =>
  ActorFlow.actorRef { out => {
    WebsocketClient.props(out, telemetryHub)
  }}
}

或者,您可以使用 Akka 提供的内部事件总线,以更少的麻烦实现同样的事情!