油门 Akka 问
Throttle Akka Ask
我正在尝试限制我对 consumerActor 的 ask
请求。
val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(10, 1.second, 1, ThrottleMode.Shaping)
.to(Sink.foreach[Any](msg => consumerActor ! msg))
.run()
和
aLotOfItems.map(items =>
val itemsFuture = (throttler ? consumeItems(items)).mapTo[Future[String]]
itemsFuture flatMap {x => x}
}).toVector
这确实会向 consumerActor 发送消息,但我似乎失去了响应,因为我尝试了 2 个项目,但请求只是挂起。
我想我需要将 Sink.foreach
中的 tell
更改为询问或可以处理响应的内容
解决方案:使用下面选定的答案使其正常工作。我不得不添加
val answer = Source(...) (from the selected answer below)
sender ! answer
问题是您期待 throttler
的回复,但 throttler
没有发送回复并且无法发送回复,因为它没有对原始发件人的引用。
如果 consumerActor
使用 Future[String]
回复每条 consumeItems(i)
消息的发件人,那么实现您想要做的事情的一种方法是创建一个 Source
来自 aLotOfItems
并使用 mapAsync
和 ask
的组合来限制发送给参与者的消息。演员的回复可以累积到Sink
。类似于:
val sink = Sink.seq[String]
val result =
Source(aLotOfItems)
.map(consumeItems(_))
.mapAsync(parallelism = 5)(item => (consumerActor ? item).mapTo[Future[String]])
.mapAsync(parallelism = 5)(identity)
.runWith(sink)
// Future[Seq[String]]
我正在尝试限制我对 consumerActor 的 ask
请求。
val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(10, 1.second, 1, ThrottleMode.Shaping)
.to(Sink.foreach[Any](msg => consumerActor ! msg))
.run()
和
aLotOfItems.map(items =>
val itemsFuture = (throttler ? consumeItems(items)).mapTo[Future[String]]
itemsFuture flatMap {x => x}
}).toVector
这确实会向 consumerActor 发送消息,但我似乎失去了响应,因为我尝试了 2 个项目,但请求只是挂起。
我想我需要将 Sink.foreach
中的 tell
更改为询问或可以处理响应的内容
解决方案:使用下面选定的答案使其正常工作。我不得不添加
val answer = Source(...) (from the selected answer below)
sender ! answer
问题是您期待 throttler
的回复,但 throttler
没有发送回复并且无法发送回复,因为它没有对原始发件人的引用。
如果 consumerActor
使用 Future[String]
回复每条 consumeItems(i)
消息的发件人,那么实现您想要做的事情的一种方法是创建一个 Source
来自 aLotOfItems
并使用 mapAsync
和 ask
的组合来限制发送给参与者的消息。演员的回复可以累积到Sink
。类似于:
val sink = Sink.seq[String]
val result =
Source(aLotOfItems)
.map(consumeItems(_))
.mapAsync(parallelism = 5)(item => (consumerActor ? item).mapTo[Future[String]])
.mapAsync(parallelism = 5)(identity)
.runWith(sink)
// Future[Seq[String]]