Stream Future in Play 2.5
Stream Future in Play 2.5
我再次尝试更新一些 Play 2.5 之前的代码(基于此 vid)。例如,以下曾经是如何流式传输 Future:
Ok.chunked(Enumerator.generateM(Promise.timeout(Some("hello"), 500)))
我使用 Akka 为 Promise.timeout
(已弃用)创建了以下变通方法:
private def keepResponding(data: String, delay: FiniteDuration, interval: FiniteDuration): Future[Result] = {
val promise: Promise[Result] = Promise[Result]()
actorSystem.scheduler.schedule(delay, interval) { promise.success(Ok(data)) }
promise.future
}
根据Play Framework Migration Guide; Enumerators
应该 重写为 Source 并且 Source.unfoldAsync
显然等同于 Enumerator.generateM
所以我希望这会起作用(其中 str
是 Future[String]
):
def inf = Action { request =>
val str = keepResponding("stream me", 1.second, 2.second)
Ok.chunked(Source.unfoldAsync(str))
}
当然我遇到了 类型不匹配 错误,当查看 unfoldAsync
的 class 签名时:
final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]])
我可以看到参数不正确,但我没有完全理解what/how我应该通过这个。
unfoldAsync
甚至比 Play! 自己的 generateM
更通用,因为它允许您传递状态 (S
) 值。这可以使发出的值取决于先前发出的值。
下面的示例将通过递增的 id 加载值,直到加载失败:
val source: Source[String, NotUsed] = Source.unfoldAsync(0){ id ⇒
loadFromId(id)
.map(s ⇒ Some((id + 1, s)))
.recover{case _ ⇒ None}
}
def loadFromId(id: Int): Future[String] = ???
在你的情况下,内部状态并不是真正需要的,因此你可以在需要时传递虚拟值,例如
val source: Source[Result, NotUsed] = Source.unfoldAsync(NotUsed) { _ ⇒
schedule("stream me", 2.seconds).map(x ⇒ Some(NotUsed → x))
}
def schedule(data: String, delay: FiniteDuration): Future[Result] = {
akka.pattern.after(delay, system.scheduler){Future.successful(Ok(data))}
}
请注意,您最初对 keepResponding
的实施是不正确的,因为您不能多次完成 Promise
。 Akka after
模式提供了一种更简单的方法来实现您的需求。
但是,请注意,在您的特定情况下,Akka Streams 提供了一个更惯用的解决方案 Source.tick
:
val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, NotUsed).mapAsync(1){ _ ⇒
loadSomeFuture()
}
def loadSomeFuture(): Future[String] = ???
或者更简单,以防您实际上不需要像示例中那样的异步计算
val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, "stream me")
我再次尝试更新一些 Play 2.5 之前的代码(基于此 vid)。例如,以下曾经是如何流式传输 Future:
Ok.chunked(Enumerator.generateM(Promise.timeout(Some("hello"), 500)))
我使用 Akka 为 Promise.timeout
(已弃用)创建了以下变通方法:
private def keepResponding(data: String, delay: FiniteDuration, interval: FiniteDuration): Future[Result] = {
val promise: Promise[Result] = Promise[Result]()
actorSystem.scheduler.schedule(delay, interval) { promise.success(Ok(data)) }
promise.future
}
根据Play Framework Migration Guide; Enumerators
应该 重写为 Source 并且 Source.unfoldAsync
显然等同于 Enumerator.generateM
所以我希望这会起作用(其中 str
是 Future[String]
):
def inf = Action { request =>
val str = keepResponding("stream me", 1.second, 2.second)
Ok.chunked(Source.unfoldAsync(str))
}
当然我遇到了 类型不匹配 错误,当查看 unfoldAsync
的 class 签名时:
final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]])
我可以看到参数不正确,但我没有完全理解what/how我应该通过这个。
unfoldAsync
甚至比 Play! 自己的 generateM
更通用,因为它允许您传递状态 (S
) 值。这可以使发出的值取决于先前发出的值。
下面的示例将通过递增的 id 加载值,直到加载失败:
val source: Source[String, NotUsed] = Source.unfoldAsync(0){ id ⇒
loadFromId(id)
.map(s ⇒ Some((id + 1, s)))
.recover{case _ ⇒ None}
}
def loadFromId(id: Int): Future[String] = ???
在你的情况下,内部状态并不是真正需要的,因此你可以在需要时传递虚拟值,例如
val source: Source[Result, NotUsed] = Source.unfoldAsync(NotUsed) { _ ⇒
schedule("stream me", 2.seconds).map(x ⇒ Some(NotUsed → x))
}
def schedule(data: String, delay: FiniteDuration): Future[Result] = {
akka.pattern.after(delay, system.scheduler){Future.successful(Ok(data))}
}
请注意,您最初对 keepResponding
的实施是不正确的,因为您不能多次完成 Promise
。 Akka after
模式提供了一种更简单的方法来实现您的需求。
但是,请注意,在您的特定情况下,Akka Streams 提供了一个更惯用的解决方案 Source.tick
:
val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, NotUsed).mapAsync(1){ _ ⇒
loadSomeFuture()
}
def loadSomeFuture(): Future[String] = ???
或者更简单,以防您实际上不需要像示例中那样的异步计算
val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, "stream me")