您将如何 "connect" 许多独立的图来维持它们之间的背压?
How would you "connect" many independent graphs maintaining backpressure between them?
关于akka-streams
的一系列问题我还有一个问题。
变量:
- 具有节流的单个 http 客户端流
- 想要同时使用第一个流的多个其他流
目标:
单个 http 流是向特定 API 发出请求的流,限制了对它的调用次数。否则它禁止我。因此,无论我的代码中有多少客户端使用请求率,保持请求率都非常重要。
有许多其他流想要向提到的 API 发出请求,但我想从 http 流中获得背压。通常你将整个事物连接到一个图表并且它有效。但就我而言,我有多个图表。
你会怎么解决?
我的尝试是:
我将 Source.queue
用于 http 流,以便我可以对 http 请求进行排队并进行限制。问题是如果我超过请求数,SourceQueue.offer
中的 Future
会失败。因此,当先前提供的事件完成时,我需要以某种方式 "reoffer" 。因此,从 SourceQueue
修改 Future
会背压其他发出 http 请求的图(在它们的 mapAsync
内)。
我是这样实现的
object Main {
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
private val queueHttp = Source.queue[(String, Promise[String])](2, OverflowStrategy.backpressure)
.throttle(1, FiniteDuration(1000, MILLISECONDS), 1, ThrottleMode.Shaping)
.mapAsync(4) {
case (text, promise) =>
// Simulate delay of http request
val delay = (Random.nextDouble() * 1000 / 2).toLong
Thread.sleep(delay)
Future.successful(text -> promise)
}
.toMat(Sink.foreach({
case (text, p) =>
p.success(text)
}))(Keep.left)
.run
val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
def sendRequest(value: String): Future[String] = {
val p = Promise[String]()
val offerFuture = queueHttp.offer(value -> p)
def addToQueue(future: Future[String]): Future[String] = {
futureDeque.addLast(future)
future.onComplete {
case _ => futureDeque.remove(future)
}
future
}
offerFuture.flatMap {
case QueueOfferResult.Enqueued =>
addToQueue(p.future)
}.recoverWith {
case ex =>
val first = futureDeque.pollFirst()
if (first != null)
addToQueue(first.flatMap(_ => sendRequest(value)))
else
sendRequest(value)
}
}
def main(args: Array[String]) {
val allFutures = for (v <- 0 until 15)
yield {
val res = sendRequest(s"Text $v")
res.onSuccess {
case text =>
println("> " + text)
}
res
}
Future.sequence(allFutures).onComplete {
case Success(text) =>
println(s">>> TOTAL: ${text.length} [in queue: ${futureDeque.size()}]")
system.terminate()
case Failure(ex) =>
ex.printStackTrace()
system.terminate()
}
Await.result(system.whenTerminated, Duration.Inf)
}
}
此解决方案的缺点是我锁定了 ConcurrentLinkedDeque
,这对于每秒 1 个请求的速率来说可能还不错,但仍然如此。
你会如何解决这个任务?
我们有一个开放的票证 (https://github.com/akka/akka/issues/19478) 和一些关于 "Hub" 阶段的想法,这将允许动态组合流,但恐怕我不能给你任何估计什么时候将完成。
这就是我们 Akka 团队解决任务的方式。 ;)
关于akka-streams
的一系列问题我还有一个问题。
变量:
- 具有节流的单个 http 客户端流
- 想要同时使用第一个流的多个其他流
目标:
单个 http 流是向特定 API 发出请求的流,限制了对它的调用次数。否则它禁止我。因此,无论我的代码中有多少客户端使用请求率,保持请求率都非常重要。
有许多其他流想要向提到的 API 发出请求,但我想从 http 流中获得背压。通常你将整个事物连接到一个图表并且它有效。但就我而言,我有多个图表。
你会怎么解决?
我的尝试是:
我将 Source.queue
用于 http 流,以便我可以对 http 请求进行排队并进行限制。问题是如果我超过请求数,SourceQueue.offer
中的 Future
会失败。因此,当先前提供的事件完成时,我需要以某种方式 "reoffer" 。因此,从 SourceQueue
修改 Future
会背压其他发出 http 请求的图(在它们的 mapAsync
内)。
我是这样实现的
object Main {
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
private val queueHttp = Source.queue[(String, Promise[String])](2, OverflowStrategy.backpressure)
.throttle(1, FiniteDuration(1000, MILLISECONDS), 1, ThrottleMode.Shaping)
.mapAsync(4) {
case (text, promise) =>
// Simulate delay of http request
val delay = (Random.nextDouble() * 1000 / 2).toLong
Thread.sleep(delay)
Future.successful(text -> promise)
}
.toMat(Sink.foreach({
case (text, p) =>
p.success(text)
}))(Keep.left)
.run
val futureDeque = new ConcurrentLinkedDeque[Future[String]]()
def sendRequest(value: String): Future[String] = {
val p = Promise[String]()
val offerFuture = queueHttp.offer(value -> p)
def addToQueue(future: Future[String]): Future[String] = {
futureDeque.addLast(future)
future.onComplete {
case _ => futureDeque.remove(future)
}
future
}
offerFuture.flatMap {
case QueueOfferResult.Enqueued =>
addToQueue(p.future)
}.recoverWith {
case ex =>
val first = futureDeque.pollFirst()
if (first != null)
addToQueue(first.flatMap(_ => sendRequest(value)))
else
sendRequest(value)
}
}
def main(args: Array[String]) {
val allFutures = for (v <- 0 until 15)
yield {
val res = sendRequest(s"Text $v")
res.onSuccess {
case text =>
println("> " + text)
}
res
}
Future.sequence(allFutures).onComplete {
case Success(text) =>
println(s">>> TOTAL: ${text.length} [in queue: ${futureDeque.size()}]")
system.terminate()
case Failure(ex) =>
ex.printStackTrace()
system.terminate()
}
Await.result(system.whenTerminated, Duration.Inf)
}
}
此解决方案的缺点是我锁定了 ConcurrentLinkedDeque
,这对于每秒 1 个请求的速率来说可能还不错,但仍然如此。
你会如何解决这个任务?
我们有一个开放的票证 (https://github.com/akka/akka/issues/19478) 和一些关于 "Hub" 阶段的想法,这将允许动态组合流,但恐怕我不能给你任何估计什么时候将完成。
这就是我们 Akka 团队解决任务的方式。 ;)