如何将 keepAlive 事件添加到 SSE 通道
How to add keepAlive event to SSE channel
我有一个 ListenerActor,它正在侦听来自后端的消息并将消息作为 SSE 事件通过通道推送。
我想让我的演员活着,这样我就可以连续播放。如何将 keepAlive 添加到我的 actor。
P.S: 我没有使用 Akka 流或 Akka http。
def filter(inboxId:String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] = Enumeratee.filter[SSEPublisher.ListenerEnvelope] { envelope: SSEPublisher.ListenerEnvelope => envelope.inboxId == inboxId }
def convert: Enumeratee[SSEPublisher.ListenerEnvelope, String] = Enumeratee.map[SSEPublisher.ListenerEnvelope] {
envelope =>
Json.toJson(envelope).toString()
}
def connDeathWatch(addr: String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] =
Enumeratee.onIterateeDone { () => println(addr + " - SSE disconnected")
}
implicit def pair[E]: EventNameExtractor[E] = EventNameExtractor[E] { p =>
val parsedJson = scala.util.parsing.json.JSON.parseFull(s"$p").get
val topic = parsedJson.asInstanceOf[Map[String, String]].apply("topic")
Some(topic)
}
implicit def id[E]: EventIdExtractor[E] = EventIdExtractor[E](p => Some(UUID.randomUUID().toString))
def events(inboxId: String) = InboxResource(inboxId)(AuthScope.Basic)(authUser => Action { implicit request =>
Ok.feed(content = ncf.sseEnumerator
&> filter(inboxId)
&> convert
&> EventSource()
).as("text/event-stream")
})
override def receive: Receive = {
case Tick =>
log.info(s"sending re-register tick to event-publisher")
Topics.all.foreach { a: Topic =>
log.info(s"$a")
clusterClient ! ClusterClient.SendToAll(publisherPath, SSEPublisher.AddListener(a, self))
}
case ListenerEnvelope(topic, inboxId, itemId, sourceId, message) =>
log.info(s"Received message from event publisher for topic $topic, for inbox $inboxId, msg : $message")
channel.push(SSEPublisher.ListenerEnvelope(topic, inboxId, itemId, sourceId, message))
}
您可以在 actor 级别创建一个 keepAlive 协议,并使用调度程序将 keepAlive 消息发送给 actor。
def convert(t: SomeType): Enumeratee[SSEPublisher.ListenerEnvelope, String] =
// pattern match on type t
}
我有一个 ListenerActor,它正在侦听来自后端的消息并将消息作为 SSE 事件通过通道推送。
我想让我的演员活着,这样我就可以连续播放。如何将 keepAlive 添加到我的 actor。
P.S: 我没有使用 Akka 流或 Akka http。
def filter(inboxId:String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] = Enumeratee.filter[SSEPublisher.ListenerEnvelope] { envelope: SSEPublisher.ListenerEnvelope => envelope.inboxId == inboxId }
def convert: Enumeratee[SSEPublisher.ListenerEnvelope, String] = Enumeratee.map[SSEPublisher.ListenerEnvelope] {
envelope =>
Json.toJson(envelope).toString()
}
def connDeathWatch(addr: String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] =
Enumeratee.onIterateeDone { () => println(addr + " - SSE disconnected")
}
implicit def pair[E]: EventNameExtractor[E] = EventNameExtractor[E] { p =>
val parsedJson = scala.util.parsing.json.JSON.parseFull(s"$p").get
val topic = parsedJson.asInstanceOf[Map[String, String]].apply("topic")
Some(topic)
}
implicit def id[E]: EventIdExtractor[E] = EventIdExtractor[E](p => Some(UUID.randomUUID().toString))
def events(inboxId: String) = InboxResource(inboxId)(AuthScope.Basic)(authUser => Action { implicit request =>
Ok.feed(content = ncf.sseEnumerator
&> filter(inboxId)
&> convert
&> EventSource()
).as("text/event-stream")
})
override def receive: Receive = {
case Tick =>
log.info(s"sending re-register tick to event-publisher")
Topics.all.foreach { a: Topic =>
log.info(s"$a")
clusterClient ! ClusterClient.SendToAll(publisherPath, SSEPublisher.AddListener(a, self))
}
case ListenerEnvelope(topic, inboxId, itemId, sourceId, message) =>
log.info(s"Received message from event publisher for topic $topic, for inbox $inboxId, msg : $message")
channel.push(SSEPublisher.ListenerEnvelope(topic, inboxId, itemId, sourceId, message))
}
您可以在 actor 级别创建一个 keepAlive 协议,并使用调度程序将 keepAlive 消息发送给 actor。
def convert(t: SomeType): Enumeratee[SSEPublisher.ListenerEnvelope, String] =
// pattern match on type t
}