Play framework Scala:使用 scala akka 流创建无限源并在服务器上保持服务器发送的事件连接打开
Play framework Scala: Create infinite source using scala akka streams and keep Server sent events connection open on server
我们需要为以下用例实施服务器发送的事件:
- 在服务器上进行一些处理后,将通知发送到 UI。这个处理是基于一些逻辑
- 从 RabbitMQ 读取消息并对其执行一些操作后,向 UI 发送通知。
我们使用 Scala(2.11/2.12) 和 Play 框架(2.6.x) 的技术集。
图书馆:akka.stream.scaladsl.Source
我们从以下示例开始我们的概念验证 https://github.com/playframework/play-scala-streaming-example,然后我们通过创建不同的源进行扩展。
我们尝试使用 Source.apply、soure.single.
创建源
但是一旦源中的所有元素都被推送到 UI,我的事件流就关闭了。但我不希望事件流关闭。我也不想使用一些计时器(Source.tick)或Source.repeat。
创建我的源时,集合假设有一些 x 元素,然后服务又添加了 4 个元素。但是在 x 个元素之后,事件流关闭然后又重新打开。
有什么方法可以让我的事件流无限大,并且只有在我的会话注销后才会关闭,或者我们可以明确关闭它。
//KeepAlive代码(如评论中所问)
object NotficationUtil {
var userNotificationMap = Map[Integer, Queue[String]]()
def addUserNotification(userId: Integer, message: String) = {
var queue = userNotificationMap.getOrElse(userId, Queue[String]())
queue += message
userNotificationMap.put(userId, queue)
}
def pushNotification(userId: Integer): Source[JsValue, _] = {
var queue = userNotificationMap.getOrElse(userId, Queue[String]())
Source.single(Json.toJson(queue.dequeueAll { x => true }))
}
}
@Singleton
class EventSourceController @Inject() (cc: ControllerComponents) extends AbstractController(cc) with FlowFactory{
def pushNotifications(user_id:Integer) = Action {
val stream = NotficationUtil.pushNotification(user_id)
Ok.chunked(stream.keepAlive(50.second, ()=>Json.obj("data"->"heartbeat")) via EventSource.flow).as(ContentTypes.EVENT_STREAM)
}
}
使用下面的代码创建 actorref 和 publisher
val (ref, sourcePublisher)= Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail).toMat(Sink.asPublisher(true))(Keep.both).run()
并从此发布者创建您的来源
val testsource = Source
.fromPublisher[T](sourcePublisher)
并将您的监听器注册为
Ok.chunked(
testsource.keepAlive(
50.seconds,
() => Json.obj("data"->"heartbeat")) via EventSource.flow)
.as(ContentTypes.EVENT_STREAM)
.withHeaders("X-Accel-Buffering" -> "no", "Cache-Control" -> "no-cache")
将您的 json 数据发送给 ref actor,数据将作为事件流通过此源流向前端。
希望对你有帮助。
我们需要为以下用例实施服务器发送的事件:
- 在服务器上进行一些处理后,将通知发送到 UI。这个处理是基于一些逻辑
- 从 RabbitMQ 读取消息并对其执行一些操作后,向 UI 发送通知。
我们使用 Scala(2.11/2.12) 和 Play 框架(2.6.x) 的技术集。 图书馆:akka.stream.scaladsl.Source
我们从以下示例开始我们的概念验证 https://github.com/playframework/play-scala-streaming-example,然后我们通过创建不同的源进行扩展。 我们尝试使用 Source.apply、soure.single.
创建源但是一旦源中的所有元素都被推送到 UI,我的事件流就关闭了。但我不希望事件流关闭。我也不想使用一些计时器(Source.tick)或Source.repeat。
创建我的源时,集合假设有一些 x 元素,然后服务又添加了 4 个元素。但是在 x 个元素之后,事件流关闭然后又重新打开。
有什么方法可以让我的事件流无限大,并且只有在我的会话注销后才会关闭,或者我们可以明确关闭它。
//KeepAlive代码(如评论中所问)
object NotficationUtil {
var userNotificationMap = Map[Integer, Queue[String]]()
def addUserNotification(userId: Integer, message: String) = {
var queue = userNotificationMap.getOrElse(userId, Queue[String]())
queue += message
userNotificationMap.put(userId, queue)
}
def pushNotification(userId: Integer): Source[JsValue, _] = {
var queue = userNotificationMap.getOrElse(userId, Queue[String]())
Source.single(Json.toJson(queue.dequeueAll { x => true }))
}
}
@Singleton
class EventSourceController @Inject() (cc: ControllerComponents) extends AbstractController(cc) with FlowFactory{
def pushNotifications(user_id:Integer) = Action {
val stream = NotficationUtil.pushNotification(user_id)
Ok.chunked(stream.keepAlive(50.second, ()=>Json.obj("data"->"heartbeat")) via EventSource.flow).as(ContentTypes.EVENT_STREAM)
}
}
使用下面的代码创建 actorref 和 publisher
val (ref, sourcePublisher)= Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail).toMat(Sink.asPublisher(true))(Keep.both).run()
并从此发布者创建您的来源
val testsource = Source
.fromPublisher[T](sourcePublisher)
并将您的监听器注册为
Ok.chunked(
testsource.keepAlive(
50.seconds,
() => Json.obj("data"->"heartbeat")) via EventSource.flow)
.as(ContentTypes.EVENT_STREAM)
.withHeaders("X-Accel-Buffering" -> "no", "Cache-Control" -> "no-cache")
将您的 json 数据发送给 ref actor,数据将作为事件流通过此源流向前端。 希望对你有帮助。