在具有自动重新连接功能的 Scala Play 框架中使用服务器发送的事件(SSE)

Consuming Server Sent Events(SSE) in scala play framework with automatic reconnect

我们如何在 scala play 框架中使用 SSE?我能找到的大部分资源都是制作 SSE 源。我想可靠地监听来自其他服务的 SSE 事件(使用自动连接)。最相关的文章是 https://doc.akka.io/docs/alpakka/current/sse.html 。我实现了这个,但这似乎不起作用(下面的代码)。还有我是su

的事件
@Singleton
class SseConsumer @Inject()((implicit ec: ExecutionContext) {

   implicit val system = ActorSystem()

  val send: HttpRequest => Future[HttpResponse] =  foo
  def foo(x:HttpRequest) = {
    try {
      println("foo")
      val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
      val newHeaders = x.withHeaders(authHeader)
      Http().singleRequest(newHeaders)

    }catch {
      case e:Exception => {
        println("Exception", e.printStackTrace())
        throw e
      }
    }
  }

  
  val eventSource: Source[ServerSentEvent, NotUsed] =
    EventSource(
      uri = Uri("https://abc/v1/events"),
      send,
      initialLastEventId = Some("2"),
      retryDelay = 1.second
    )


  def orderStatusEventStable() = {
    val events: Future[immutable.Seq[ServerSentEvent]] =
      eventSource
        .throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
        .take(10)
        .runWith(Sink.seq)
    events.map(_.foreach( x => {
      println("456")
      println(x.data)
    }))
  }



  Future {
    blocking{
      while(true){
        try{
          Thread.sleep(2000)
          orderStatusEventStable()
        } catch {
          case e:Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }



}

这不会给出任何异常,并且永远不会打印 println("456")。

编辑:

Future {
    blocking {
      while(true){
        try{
          Await.result(orderStatusEventStable() recover {
            case e: Exception => {
              println("exception", e)
              throw e
            }
          }, Duration.Inf)
        } catch {
          case e:Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }

添加了一个 await 并开始工作。一次可以阅读 10 条消息。但是现在我面临着另一个问题。 我有一个生产者,有时生产速度比我消耗的速度快,使用这段代码我有 2 个问题:

  1. 我必须等到 10 条消息可用。我们怎样才能采取最大。 10 分钟。共 0 条消息?
  2. 当生产率>消耗率时,我错过了几个事件。我猜这是由于节流造成的。我们如何使用背压来处理它?

您的代码中的问题是 events: Future 只会在流 (eventSource) 完成时完成。

我不熟悉 SSE,但在您的情况下,流可能永远不会完成,因为它一直在侦听新事件。

您可以在 Akka Stream 文档中了解更多信息。

根据您想对事件执行的操作,您可以 map 在直播中像这样:

eventSource
  ...
  .map(/* do something */)
  .runWith(...)

基本上,您需要使用 Akka Stream Source,因为数据正在通过它,但不要等待它完成。

编辑:我没有注意到 take(10),只有当 take 不在此处时我的回答才适用。发送 10 个事件后,您的代码应该可以正常工作。