来自 Sink 的 Akka Stream return 对象
Akka Stream return object from Sink
我有一个 SourceQueue
。当我为此提供一个元素时,我希望它通过 Stream
并且当它到达 Sink
时将输出返回到提供此元素的代码(类似于 Sink.head
returns RunnableGraph.run()
调用的一个元素)。
我该如何实现?我的问题的一个简单示例是:
val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()
val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"
编辑: 对于我在这个问题中给出的示例,Vladimir Matveev 提供了最佳答案。但是,应该注意的是,只有当元素以与提供给 source
的顺序相同的顺序进入 sink
时,此解决方案才有效。如果不能保证 sink
中元素的顺序可能不同,结果可能与预期不同。
嗯,如果你看一下它的定义,你就知道 offer()
方法 returns :) 你可以做的是创建 Source.queue[(Promise[String], String)]
,创建推送对的辅助函数通过 offer
进行流式传输,确保 offer
不会因为队列已满而失败,然后在流中完成承诺并使用承诺的未来在外部代码中捕获完成事件。
我这样做是为了限制从我项目的多个地方使用的外部 API 的速率。
这是在 Typesafe 将 Hub 源添加到 akka 之前它在我的项目中的样子
import scala.concurrent.Promise
import scala.concurrent.Future
import java.util.concurrent.ConcurrentLinkedDeque
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import scala.util.Success
private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure)
.toMat(Sink.foreach({ case (p, param) =>
p.complete(Success(param.reverse))
}))(Keep.left)
.run
private val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
private def sendQueuedRequest(request: String): Future[String] = {
val p = Promise[String]
val offerFuture = queue.offer(p -> request)
def addToQueue(future: Future[String]): Future[String] = {
futureDeque.addLast(future)
future.onComplete(_ => futureDeque.remove(future))
future
}
offerFuture.flatMap {
case QueueOfferResult.Enqueued =>
addToQueue(p.future)
}.recoverWith {
case ex =>
val first = futureDeque.pollFirst()
if (first != null)
addToQueue(first.flatMap(_ => sendQueuedRequest(request)))
else
sendQueuedRequest(request)
}
}
我意识到阻塞同步队列可能是瓶颈并且可能会无限增长但是因为 API 我项目中的调用仅来自其他背压的 akka 流所以我在 [=15] 中从来没有超过十几个项目=].您的情况可能有所不同。
如果您改为创建 MergeHub.source[(Promise[String], String)]()
,您将获得可重复使用的水槽。因此,每次您需要处理项目时,您都将创建完整的图形并 运行 它。在这种情况下,您将不需要 hacky java 容器来排队请求。
我相信使用已经存在的原语从流中提取值更简单,称为 Sink.queue
。这是一个例子:
val source = Source.queue[String](128, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1))
val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()
def getNext: String = Await.result(sinkQueue.pull(), 1.second).get
sourceQueue.offer("foo")
println(getNext)
sourceQueue.offer("bar")
println(getNext)
sourceQueue.offer("baz")
println(getNext)
它完全符合您的要求。
请注意,为队列接收器设置 inputBuffer
属性对于您的用例可能重要也可能不重要 - 如果您不设置它,缓冲区大小将为零并且数据不会'在您调用接收器上的 pull()
方法之前,不会流过流。
sinkQueue.pull()
产生一个 Future[Option[T]]
,如果接收器接收到一个元素,它将以 Some
成功完成,或者如果流失败则失败。如果流正常完成,它将以 None
完成。在这个特定示例中,我通过使用 Option.get
忽略了这一点,但您可能希望添加自定义逻辑来处理这种情况。
我有一个 SourceQueue
。当我为此提供一个元素时,我希望它通过 Stream
并且当它到达 Sink
时将输出返回到提供此元素的代码(类似于 Sink.head
returns RunnableGraph.run()
调用的一个元素)。
我该如何实现?我的问题的一个简单示例是:
val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()
val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"
编辑: 对于我在这个问题中给出的示例,Vladimir Matveev 提供了最佳答案。但是,应该注意的是,只有当元素以与提供给 source
的顺序相同的顺序进入 sink
时,此解决方案才有效。如果不能保证 sink
中元素的顺序可能不同,结果可能与预期不同。
嗯,如果你看一下它的定义,你就知道 offer()
方法 returns :) 你可以做的是创建 Source.queue[(Promise[String], String)]
,创建推送对的辅助函数通过 offer
进行流式传输,确保 offer
不会因为队列已满而失败,然后在流中完成承诺并使用承诺的未来在外部代码中捕获完成事件。
我这样做是为了限制从我项目的多个地方使用的外部 API 的速率。
这是在 Typesafe 将 Hub 源添加到 akka 之前它在我的项目中的样子
import scala.concurrent.Promise
import scala.concurrent.Future
import java.util.concurrent.ConcurrentLinkedDeque
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import scala.util.Success
private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure)
.toMat(Sink.foreach({ case (p, param) =>
p.complete(Success(param.reverse))
}))(Keep.left)
.run
private val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
private def sendQueuedRequest(request: String): Future[String] = {
val p = Promise[String]
val offerFuture = queue.offer(p -> request)
def addToQueue(future: Future[String]): Future[String] = {
futureDeque.addLast(future)
future.onComplete(_ => futureDeque.remove(future))
future
}
offerFuture.flatMap {
case QueueOfferResult.Enqueued =>
addToQueue(p.future)
}.recoverWith {
case ex =>
val first = futureDeque.pollFirst()
if (first != null)
addToQueue(first.flatMap(_ => sendQueuedRequest(request)))
else
sendQueuedRequest(request)
}
}
我意识到阻塞同步队列可能是瓶颈并且可能会无限增长但是因为 API 我项目中的调用仅来自其他背压的 akka 流所以我在 [=15] 中从来没有超过十几个项目=].您的情况可能有所不同。
如果您改为创建 MergeHub.source[(Promise[String], String)]()
,您将获得可重复使用的水槽。因此,每次您需要处理项目时,您都将创建完整的图形并 运行 它。在这种情况下,您将不需要 hacky java 容器来排队请求。
我相信使用已经存在的原语从流中提取值更简单,称为 Sink.queue
。这是一个例子:
val source = Source.queue[String](128, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1))
val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()
def getNext: String = Await.result(sinkQueue.pull(), 1.second).get
sourceQueue.offer("foo")
println(getNext)
sourceQueue.offer("bar")
println(getNext)
sourceQueue.offer("baz")
println(getNext)
它完全符合您的要求。
请注意,为队列接收器设置 inputBuffer
属性对于您的用例可能重要也可能不重要 - 如果您不设置它,缓冲区大小将为零并且数据不会'在您调用接收器上的 pull()
方法之前,不会流过流。
sinkQueue.pull()
产生一个 Future[Option[T]]
,如果接收器接收到一个元素,它将以 Some
成功完成,或者如果流失败则失败。如果流正常完成,它将以 None
完成。在这个特定示例中,我通过使用 Option.get
忽略了这一点,但您可能希望添加自定义逻辑来处理这种情况。