油门 Akka 问

Throttle Akka Ask

我正在尝试限制我对 consumerActor 的 ask 请求。

val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
  .throttle(10, 1.second, 1, ThrottleMode.Shaping)
  .to(Sink.foreach[Any](msg => consumerActor ! msg))
  .run()

aLotOfItems.map(items =>
  val itemsFuture = (throttler ? consumeItems(items)).mapTo[Future[String]]
  itemsFuture flatMap {x => x}
}).toVector

这确实会向 consumerActor 发送消息,但我似乎失去了响应,因为我尝试了 2 个项目,但请求只是挂起。

我想我需要将 Sink.foreach 中的 tell 更改为询问或可以处理响应的内容

解决方案:使用下面选定的答案使其正常工作。我不得不添加

val answer = Source(...) (from the selected answer below)
sender ! answer

问题是您期待 throttler 的回复,但 throttler 没有发送回复并且无法发送回复,因为它没有对原始发件人的引用。

如果 consumerActor 使用 Future[String] 回复每条 consumeItems(i) 消息的发件人,那么实现您想要做的事情的一种方法是创建一个 Source 来自 aLotOfItems 并使用 mapAsyncask 的组合来限制发送给参与者的消息。演员的回复可以累积到Sink。类似于:

val sink = Sink.seq[String]

val result =
  Source(aLotOfItems)
    .map(consumeItems(_))
    .mapAsync(parallelism = 5)(item => (consumerActor ? item).mapTo[Future[String]])
    .mapAsync(parallelism = 5)(identity)
    .runWith(sink)

// Future[Seq[String]]