在具有自动重新连接功能的 Scala Play 框架中使用服务器发送的事件(SSE)
Consuming Server Sent Events(SSE) in scala play framework with automatic reconnect
我们如何在 scala play 框架中使用 SSE?我能找到的大部分资源都是制作 SSE 源。我想可靠地监听来自其他服务的 SSE 事件(使用自动连接)。最相关的文章是 https://doc.akka.io/docs/alpakka/current/sse.html 。我实现了这个,但这似乎不起作用(下面的代码)。还有我是su
的事件
@Singleton
class SseConsumer @Inject()((implicit ec: ExecutionContext) {
implicit val system = ActorSystem()
val send: HttpRequest => Future[HttpResponse] = foo
def foo(x:HttpRequest) = {
try {
println("foo")
val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
val newHeaders = x.withHeaders(authHeader)
Http().singleRequest(newHeaders)
}catch {
case e:Exception => {
println("Exception", e.printStackTrace())
throw e
}
}
}
val eventSource: Source[ServerSentEvent, NotUsed] =
EventSource(
uri = Uri("https://abc/v1/events"),
send,
initialLastEventId = Some("2"),
retryDelay = 1.second
)
def orderStatusEventStable() = {
val events: Future[immutable.Seq[ServerSentEvent]] =
eventSource
.throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
.take(10)
.runWith(Sink.seq)
events.map(_.foreach( x => {
println("456")
println(x.data)
}))
}
Future {
blocking{
while(true){
try{
Thread.sleep(2000)
orderStatusEventStable()
} catch {
case e:Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
}
这不会给出任何异常,并且永远不会打印 println("456")。
编辑:
Future {
blocking {
while(true){
try{
Await.result(orderStatusEventStable() recover {
case e: Exception => {
println("exception", e)
throw e
}
}, Duration.Inf)
} catch {
case e:Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
添加了一个 await 并开始工作。一次可以阅读 10 条消息。但是现在我面临着另一个问题。
我有一个生产者,有时生产速度比我消耗的速度快,使用这段代码我有 2 个问题:
- 我必须等到 10 条消息可用。我们怎样才能采取最大。 10 分钟。共 0 条消息?
- 当生产率>消耗率时,我错过了几个事件。我猜这是由于节流造成的。我们如何使用背压来处理它?
您的代码中的问题是 events: Future
只会在流 (eventSource
) 完成时完成。
我不熟悉 SSE,但在您的情况下,流可能永远不会完成,因为它一直在侦听新事件。
您可以在 Akka Stream 文档中了解更多信息。
根据您想对事件执行的操作,您可以 map
在直播中像这样:
eventSource
...
.map(/* do something */)
.runWith(...)
基本上,您需要使用 Akka Stream Source
,因为数据正在通过它,但不要等待它完成。
编辑:我没有注意到 take(10)
,只有当 take
不在此处时我的回答才适用。发送 10 个事件后,您的代码应该可以正常工作。
我们如何在 scala play 框架中使用 SSE?我能找到的大部分资源都是制作 SSE 源。我想可靠地监听来自其他服务的 SSE 事件(使用自动连接)。最相关的文章是 https://doc.akka.io/docs/alpakka/current/sse.html 。我实现了这个,但这似乎不起作用(下面的代码)。还有我是su
的事件@Singleton
class SseConsumer @Inject()((implicit ec: ExecutionContext) {
implicit val system = ActorSystem()
val send: HttpRequest => Future[HttpResponse] = foo
def foo(x:HttpRequest) = {
try {
println("foo")
val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
val newHeaders = x.withHeaders(authHeader)
Http().singleRequest(newHeaders)
}catch {
case e:Exception => {
println("Exception", e.printStackTrace())
throw e
}
}
}
val eventSource: Source[ServerSentEvent, NotUsed] =
EventSource(
uri = Uri("https://abc/v1/events"),
send,
initialLastEventId = Some("2"),
retryDelay = 1.second
)
def orderStatusEventStable() = {
val events: Future[immutable.Seq[ServerSentEvent]] =
eventSource
.throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
.take(10)
.runWith(Sink.seq)
events.map(_.foreach( x => {
println("456")
println(x.data)
}))
}
Future {
blocking{
while(true){
try{
Thread.sleep(2000)
orderStatusEventStable()
} catch {
case e:Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
}
这不会给出任何异常,并且永远不会打印 println("456")。
编辑:
Future {
blocking {
while(true){
try{
Await.result(orderStatusEventStable() recover {
case e: Exception => {
println("exception", e)
throw e
}
}, Duration.Inf)
} catch {
case e:Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
添加了一个 await 并开始工作。一次可以阅读 10 条消息。但是现在我面临着另一个问题。 我有一个生产者,有时生产速度比我消耗的速度快,使用这段代码我有 2 个问题:
- 我必须等到 10 条消息可用。我们怎样才能采取最大。 10 分钟。共 0 条消息?
- 当生产率>消耗率时,我错过了几个事件。我猜这是由于节流造成的。我们如何使用背压来处理它?
您的代码中的问题是 events: Future
只会在流 (eventSource
) 完成时完成。
我不熟悉 SSE,但在您的情况下,流可能永远不会完成,因为它一直在侦听新事件。
您可以在 Akka Stream 文档中了解更多信息。
根据您想对事件执行的操作,您可以 map
在直播中像这样:
eventSource
...
.map(/* do something */)
.runWith(...)
基本上,您需要使用 Akka Stream Source
,因为数据正在通过它,但不要等待它完成。
编辑:我没有注意到 take(10)
,只有当 take
不在此处时我的回答才适用。发送 10 个事件后,您的代码应该可以正常工作。