Play Framework WebSocket Actor 过滤
Play Framework WebSocket Actor Filtering
我目前正在考虑一个实现,通过它我可以从我的 Play 框架 Web 应用程序流式传输一些事件。有一组 IoT 设备可以发出事件和警报。这些设备由它们的 id 标识。我有一个 HTTP 端点,可以通过它获取这些设备的遥测信号。现在我想对警报和事件做同样的事情。所以我开始使用这种简单的方法,首先在我的控制器中定义我的终点,如下所示:
def events = WebSocket.accept[String, String] { request =>
ActorFlow.actorRef { out =>
EventsActor.props(out)
}
}
我的活动演员:
class EventsActor(out: ActorRef) extends Actor {
def receive = {
case msg: String =>
out ! ("I received your message: " + msg)
}
}
object EventsActor {
def props(out: ActorRef) =
Props(new EventsActor(out))
}
现在,我没有对我的 EventsActor 做太多事情,但稍后这个 Actor 将获得事件和警报消息被推送到其中,然后将其路由到 WebSocket 端点。
现在我的要求是,在 WebSocket 端点中,当客户端建立连接时,他应该能够为他希望连接的物联网设备指定一个 ID,我应该能够将这个 ID 传递给我可以在其中过滤包含传入 ID 的事件的 EventsActor。
关于如何执行此操作的任何线索?
我做了一个简单的例子来说明你可以如何处理这个问题。虽然还有很多不足之处,但希望对你有所启发!
您实际上想要的是一个 coordinator/router,它可以跟踪哪些 websocket 参与者正在侦听哪些事件类型。您可以将该集线器注入所有已创建的角色,并向其发送事件以将这些 websocket 角色注册到事件集。
object TelemetryHub {
/** This is the message external sensors could use to stream the data in to the hub **/
case class FreshData(eventId: UUID, data: Int)
def props = Props(new TelemetryHub)
}
class TelemetryHub extends Actor {
type EventID = UUID
private var subscriptions =
mutable.Map.empty[EventID, Set[ActorRef]].withDefault(_ => Set())
override def receive = {
/** we can use the sender ref to add the requesting actor to the set of subscribers **/
case SubscribeTo(topic) => subscriptions(topic) = subscriptions(topic) + sender()
/** Now when the hub receives data, it can send a message to all subscribers
* of that particular topic
*/
case FreshData(incomingDataTopicID, data) =>
subscriptions.find { case (topic, _) => topic == incomingDataTopicID } match {
case Some((_, subscribers)) => subscribers foreach { _ ! EventData(data) }
case None => println("This topic was not yet subscribed to")
}
}
}
现在我们有了上面的结构,您的 websocket 端点可能如下所示:
object WebsocketClient {
/**
* Some messages with which we can subscribe to a topic
* These messages could be streamed through the websocket from the client
*/
case class SubscribeTo(eventID: UUID)
/** This is an example of some data we want to go back to the client. Uses int for simplicity **/
case class EventData(data: Int)
def props(out: ActorRef, telemetryHub: ActorRef) = Props(new WebsocketClient(out, telemetryHub))
}
/** Every client will own one of these actors. **/
class WebsocketClient(out: ActorRef, telemetryHub: ActorRef) extends Actor {
def receive = {
/** We can send these subscription request to a hub **/
case subscriptionRequest: SubscribeTo => telemetryHub ! subscriptionRequest
/** When we get data back, we can send it right to the client **/
case EventData(data) => out ! data
}
}
/** We can make a single hub which will be shared between all the connections **/
val telemetryHub = actorSys actorOf TelemetryHub.props
def events = WebSocket.accept[String, String] { _ =>
ActorFlow.actorRef { out => {
WebsocketClient.props(out, telemetryHub)
}}
}
或者,您可以使用 Akka 提供的内部事件总线,以更少的麻烦实现同样的事情!
我目前正在考虑一个实现,通过它我可以从我的 Play 框架 Web 应用程序流式传输一些事件。有一组 IoT 设备可以发出事件和警报。这些设备由它们的 id 标识。我有一个 HTTP 端点,可以通过它获取这些设备的遥测信号。现在我想对警报和事件做同样的事情。所以我开始使用这种简单的方法,首先在我的控制器中定义我的终点,如下所示:
def events = WebSocket.accept[String, String] { request =>
ActorFlow.actorRef { out =>
EventsActor.props(out)
}
}
我的活动演员:
class EventsActor(out: ActorRef) extends Actor {
def receive = {
case msg: String =>
out ! ("I received your message: " + msg)
}
}
object EventsActor {
def props(out: ActorRef) =
Props(new EventsActor(out))
}
现在,我没有对我的 EventsActor 做太多事情,但稍后这个 Actor 将获得事件和警报消息被推送到其中,然后将其路由到 WebSocket 端点。
现在我的要求是,在 WebSocket 端点中,当客户端建立连接时,他应该能够为他希望连接的物联网设备指定一个 ID,我应该能够将这个 ID 传递给我可以在其中过滤包含传入 ID 的事件的 EventsActor。
关于如何执行此操作的任何线索?
我做了一个简单的例子来说明你可以如何处理这个问题。虽然还有很多不足之处,但希望对你有所启发!
您实际上想要的是一个 coordinator/router,它可以跟踪哪些 websocket 参与者正在侦听哪些事件类型。您可以将该集线器注入所有已创建的角色,并向其发送事件以将这些 websocket 角色注册到事件集。
object TelemetryHub {
/** This is the message external sensors could use to stream the data in to the hub **/
case class FreshData(eventId: UUID, data: Int)
def props = Props(new TelemetryHub)
}
class TelemetryHub extends Actor {
type EventID = UUID
private var subscriptions =
mutable.Map.empty[EventID, Set[ActorRef]].withDefault(_ => Set())
override def receive = {
/** we can use the sender ref to add the requesting actor to the set of subscribers **/
case SubscribeTo(topic) => subscriptions(topic) = subscriptions(topic) + sender()
/** Now when the hub receives data, it can send a message to all subscribers
* of that particular topic
*/
case FreshData(incomingDataTopicID, data) =>
subscriptions.find { case (topic, _) => topic == incomingDataTopicID } match {
case Some((_, subscribers)) => subscribers foreach { _ ! EventData(data) }
case None => println("This topic was not yet subscribed to")
}
}
}
现在我们有了上面的结构,您的 websocket 端点可能如下所示:
object WebsocketClient {
/**
* Some messages with which we can subscribe to a topic
* These messages could be streamed through the websocket from the client
*/
case class SubscribeTo(eventID: UUID)
/** This is an example of some data we want to go back to the client. Uses int for simplicity **/
case class EventData(data: Int)
def props(out: ActorRef, telemetryHub: ActorRef) = Props(new WebsocketClient(out, telemetryHub))
}
/** Every client will own one of these actors. **/
class WebsocketClient(out: ActorRef, telemetryHub: ActorRef) extends Actor {
def receive = {
/** We can send these subscription request to a hub **/
case subscriptionRequest: SubscribeTo => telemetryHub ! subscriptionRequest
/** When we get data back, we can send it right to the client **/
case EventData(data) => out ! data
}
}
/** We can make a single hub which will be shared between all the connections **/
val telemetryHub = actorSys actorOf TelemetryHub.props
def events = WebSocket.accept[String, String] { _ =>
ActorFlow.actorRef { out => {
WebsocketClient.props(out, telemetryHub)
}}
}
或者,您可以使用 Akka 提供的内部事件总线,以更少的麻烦实现同样的事情!