如何在 PlayFramework 中使用 Akka Streams SourceQueue

How to use an Akka Streams SourceQueue with PlayFramework

我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中。 Play 控制器需要一个 Source 才能使用 chuncked 方法流式传输结果。
由于 Play 在后台使用它自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源会在 chunked 方法使用之前被消耗(除非我使用以下 hack) .

如果我使用反应流发布器预先实现源队列,我就能让它工作,但它是一种 'dirty hack' :

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  }

是否有更简单的方法将 Akka Streams SourceQueue 与 PlayFramework 结合使用?

谢谢

解决方案是在源上使用 mapMaterializedValue 来获得其队列物化的未来:

def sourceQueueAction = Action {
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map { queue =>
      Source.tick(0.second, 1.second, "tick")
        .runForeach (t => queue.offer(t))
    }
    Ok.chunked(queueSource)

  }

  //T is the source type, here String
  //M is the materialization type, here a SourceQueue[String]
  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      p.trySuccess(m)
      m
    }
    (s, p.future)
  }

想分享我今天获得的见解,尽管它可能不适合您使用 Play 的情况。

与其考虑触发 Source,通常可以将问题颠倒过来,并向执行采购的函数提供 Sink

在这种情况下,Sink 将是 "recipe" (non-materialized) 阶段,我们现在可以使用 Source.queue 并立即实现它。得到队列。得到它运行的流程。