如何在 PlayFramework 中使用 Akka Streams SourceQueue
How to use an Akka Streams SourceQueue with PlayFramework
我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中。
Play 控制器需要一个 Source 才能使用 chuncked
方法流式传输结果。
由于 Play 在后台使用它自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源会在 chunked
方法使用之前被消耗(除非我使用以下 hack) .
如果我使用反应流发布器预先实现源队列,我就能让它工作,但它是一种 'dirty hack' :
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
是否有更简单的方法将 Akka Streams SourceQueue 与 PlayFramework 结合使用?
谢谢
解决方案是在源上使用 mapMaterializedValue
来获得其队列物化的未来:
def sourceQueueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Source.tick(0.second, 1.second, "tick")
.runForeach (t => queue.offer(t))
}
Ok.chunked(queueSource)
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
想分享我今天获得的见解,尽管它可能不适合您使用 Play 的情况。
与其考虑触发 Source
,通常可以将问题颠倒过来,并向执行采购的函数提供 Sink
。
在这种情况下,Sink
将是 "recipe" (non-materialized) 阶段,我们现在可以使用 Source.queue
并立即实现它。得到队列。得到它运行的流程。
我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中。
Play 控制器需要一个 Source 才能使用 chuncked
方法流式传输结果。
由于 Play 在后台使用它自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源会在 chunked
方法使用之前被消耗(除非我使用以下 hack) .
如果我使用反应流发布器预先实现源队列,我就能让它工作,但它是一种 'dirty hack' :
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
是否有更简单的方法将 Akka Streams SourceQueue 与 PlayFramework 结合使用?
谢谢
解决方案是在源上使用 mapMaterializedValue
来获得其队列物化的未来:
def sourceQueueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Source.tick(0.second, 1.second, "tick")
.runForeach (t => queue.offer(t))
}
Ok.chunked(queueSource)
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
想分享我今天获得的见解,尽管它可能不适合您使用 Play 的情况。
与其考虑触发 Source
,通常可以将问题颠倒过来,并向执行采购的函数提供 Sink
。
在这种情况下,Sink
将是 "recipe" (non-materialized) 阶段,我们现在可以使用 Source.queue
并立即实现它。得到队列。得到它运行的流程。