来自 Sink 的 Akka Stream return 对象

Akka Stream return object from Sink

我有一个 SourceQueue。当我为此提供一个元素时,我希望它通过 Stream 并且当它到达 Sink 时将输出返回到提供此元素的代码(类似于 Sink.head returns RunnableGraph.run() 调用的一个元素)。

我该如何实现?我的问题的一个简单示例是:

val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()

val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"

编辑: 对于我在这个问题中给出的示例,Vladimir Matveev 提供了最佳答案。但是,应该注意的是,只有当元素以与提供给 source 的顺序相同的顺序进入 sink 时,此解决方案才有效。如果不能保证 sink 中元素的顺序可能不同,结果可能与预期不同。

嗯,如果你看一下它的定义,你就知道 offer() 方法 returns :) 你可以做的是创建 Source.queue[(Promise[String], String)],创建推送对的辅助函数通过 offer 进行流式传输,确保 offer 不会因为队列已满而失败,然后在流中完成承诺并使用承诺的未来在外部代码中捕获完成事件。

我这样做是为了限制从我项目的多个地方使用的外部 API 的速率。

这是在 Typesafe 将 Hub 源添加到 akka 之前它在我的项目中的样子

import scala.concurrent.Promise
import scala.concurrent.Future
import java.util.concurrent.ConcurrentLinkedDeque

import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{OverflowStrategy, QueueOfferResult}

import scala.util.Success

private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure)
  .toMat(Sink.foreach({ case (p, param) =>
      p.complete(Success(param.reverse))
  }))(Keep.left)
  .run

private val futureDeque = new ConcurrentLinkedDeque[Future[String]]()

private def sendQueuedRequest(request: String): Future[String] = {

  val p = Promise[String]

  val offerFuture = queue.offer(p -> request)

  def addToQueue(future: Future[String]): Future[String] = {
    futureDeque.addLast(future)
    future.onComplete(_ => futureDeque.remove(future))
    future
  }

  offerFuture.flatMap {
    case QueueOfferResult.Enqueued =>
      addToQueue(p.future)
  }.recoverWith {
    case ex =>
      val first = futureDeque.pollFirst()
      if (first != null)
        addToQueue(first.flatMap(_ => sendQueuedRequest(request)))
      else
        sendQueuedRequest(request)
  }
}

我意识到阻塞同步队列可能是瓶颈并且可能会无限增长但是因为 API 我项目中的调用仅来自其他背压的 akka 流所以我在 [=15] 中从来没有超过十几个项目=].您的情况可能有所不同。

如果您改为创建 MergeHub.source[(Promise[String], String)](),您将获得可重复使用的水槽。因此,每次您需要处理项目时,您都将创建完整的图形并 运行 它。在这种情况下,您将不需要 hacky java 容器来排队请求。

我相信使用已经存在的原语从流中提取值更简单,称为 Sink.queue。这是一个例子:

val source = Source.queue[String](128, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1))

val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()

def getNext: String = Await.result(sinkQueue.pull(), 1.second).get

sourceQueue.offer("foo")
println(getNext)

sourceQueue.offer("bar")
println(getNext)

sourceQueue.offer("baz")
println(getNext)

它完全符合您的要求。

请注意,为队列接收器设置 inputBuffer 属性对于您的用例可能重要也可能不重要 - 如果您不设置它,缓冲区大小将为零并且数据不会'在您调用接收器上的 pull() 方法之前,不会流过流。

sinkQueue.pull() 产生一个 Future[Option[T]],如果接收器接收到一个元素,它将以 Some 成功完成,或者如果流失败则失败。如果流正常完成,它将以 None 完成。在这个特定示例中,我通过使用 Option.get 忽略了这一点,但您可能希望添加自定义逻辑来处理这种情况。