如何将 keepAlive 事件添加到 SSE 通道

How to add keepAlive event to SSE channel

我有一个 ListenerActor,它正在侦听来自后端的消息并将消息作为 SSE 事件通过通道推送。

我想让我的演员活着,这样我就可以连续播放。如何将 keepAlive 添加到我的 actor。

P.S: 我没有使用 Akka 流或 Akka http。

def filter(inboxId:String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] = Enumeratee.filter[SSEPublisher.ListenerEnvelope] { envelope: SSEPublisher.ListenerEnvelope => envelope.inboxId == inboxId }

def convert: Enumeratee[SSEPublisher.ListenerEnvelope, String] = Enumeratee.map[SSEPublisher.ListenerEnvelope] {
  envelope =>
    Json.toJson(envelope).toString()
}

def connDeathWatch(addr: String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] =
  Enumeratee.onIterateeDone { () => println(addr + " - SSE disconnected")
}

implicit def pair[E]: EventNameExtractor[E] = EventNameExtractor[E] { p =>
  val parsedJson = scala.util.parsing.json.JSON.parseFull(s"$p").get
  val topic = parsedJson.asInstanceOf[Map[String, String]].apply("topic")
  Some(topic)
}

implicit def id[E]: EventIdExtractor[E] = EventIdExtractor[E](p => Some(UUID.randomUUID().toString))

def events(inboxId: String) = InboxResource(inboxId)(AuthScope.Basic)(authUser => Action { implicit request =>
  Ok.feed(content = ncf.sseEnumerator
    &> filter(inboxId)
    &> convert
    &> EventSource()
  ).as("text/event-stream")
})

override def receive: Receive = {
  case Tick =>
    log.info(s"sending re-register tick to event-publisher")
    Topics.all.foreach { a: Topic =>
      log.info(s"$a")
      clusterClient ! ClusterClient.SendToAll(publisherPath, SSEPublisher.AddListener(a, self))
    }

  case ListenerEnvelope(topic, inboxId, itemId, sourceId, message) =>
    log.info(s"Received message from event publisher for topic $topic, for inbox $inboxId, msg : $message")
    channel.push(SSEPublisher.ListenerEnvelope(topic, inboxId, itemId, sourceId, message))
}

您可以在 actor 级别创建一个 keepAlive 协议,并使用调度程序将 keepAlive 消息发送给 actor。

def convert(t: SomeType): Enumeratee[SSEPublisher.ListenerEnvelope, String] = 
// pattern match on type t
  }