您将如何 "connect" 许多独立的图来维持它们之间的背压?

How would you "connect" many independent graphs maintaining backpressure between them?

关于akka-streams的一系列问题我还有一个问题。

变量:

目标:

单个 http 流是向特定 API 发出请求的流,限制了对它的调用次数。否则它禁止我。因此,无论我的代码中有多少客户端使用请求率,保持请求率都非常重要。

有许多其他流想要向提到的 API 发出请求,但我想从 http 流中获得背压。通常你将整个事物连接到一个图表并且它有效。但就我而言,我有多个图表。

你会怎么解决?

我的尝试是:

我将 Source.queue 用于 http 流,以便我可以对 http 请求进行排队并进行限制。问题是如果我超过请求数,SourceQueue.offer 中的 Future 会失败。因此,当先前提供的事件完成时,我需要以某种方式 "reoffer" 。因此,从 SourceQueue 修改 Future 会背压其他发出 http 请求的图(在它们的 mapAsync 内)。

我是这样实现的

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  private val queueHttp = Source.queue[(String, Promise[String])](2, OverflowStrategy.backpressure)
    .throttle(1, FiniteDuration(1000, MILLISECONDS), 1, ThrottleMode.Shaping)
    .mapAsync(4) {
      case (text, promise) =>
        // Simulate delay of http request
        val delay = (Random.nextDouble() * 1000 / 2).toLong
        Thread.sleep(delay)
        Future.successful(text -> promise)
    }
    .toMat(Sink.foreach({
      case (text, p) =>
        p.success(text)
    }))(Keep.left)
    .run

  val futureDeque = new ConcurrentLinkedDeque[Future[String]]()

  def sendRequest(value: String): Future[String] = {

    val p = Promise[String]()
    val offerFuture = queueHttp.offer(value -> p)

    def addToQueue(future: Future[String]): Future[String] = {
      futureDeque.addLast(future)
      future.onComplete {
        case _ => futureDeque.remove(future)
      }
      future
    }

    offerFuture.flatMap {
      case QueueOfferResult.Enqueued =>
        addToQueue(p.future)
    }.recoverWith {
      case ex =>
        val first = futureDeque.pollFirst()
        if (first != null)
          addToQueue(first.flatMap(_ => sendRequest(value)))
        else
          sendRequest(value)
    }
  }

  def main(args: Array[String]) {

    val allFutures = for (v <- 0 until 15)
      yield {
        val res = sendRequest(s"Text $v")
        res.onSuccess {
          case text =>
            println("> " + text)
        }
        res
      }

    Future.sequence(allFutures).onComplete {
      case Success(text) =>
        println(s">>> TOTAL: ${text.length} [in queue: ${futureDeque.size()}]")
        system.terminate()
      case Failure(ex) =>
        ex.printStackTrace()
        system.terminate()
    }

    Await.result(system.whenTerminated, Duration.Inf)
  }
}

此解决方案的缺点是我锁定了 ConcurrentLinkedDeque,这对于每秒 1 个请求的速率来说可能还不错,但仍然如此。

你会如何解决这个任务?

我们有一个开放的票证 (https://github.com/akka/akka/issues/19478) 和一些关于 "Hub" 阶段的想法,这将允许动态组合流,但恐怕我不能给你任何估计什么时候将完成。

这就是我们 Akka 团队解决任务的方式。 ;)