akka stream Source.actorRef buffer 和 OverflowStrategy 的 Sink fold
Sink fold for akka stream Source.actorRef buffer and OverflowStrategy
这是来自 akka documentation
的代码片段
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 3.seconds)
assert(result == "123")
这是一个有效的代码片段,但是,如果我使用 ref 来告诉另一条消息,如 ref ! 4
,我会得到一个异常,如 akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)
我想缓冲区大小 3 应该足够了。原因是折叠操作是(acc, ele) => acc,所以它需要累加器和元素到return新值累加器。
所以我更改了代码让另一个演员告诉等待 3 秒。它又开始工作了。
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
Thread.sleep(3000)
ref ! 4
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 10.seconds)
println(result)
但是,我的问题是有没有一种方法可以告诉 Akka 流放慢速度或等待接收器可用。我也在用OverflowStrategy.backpressure
,但是它说Backpressure overflowStrategy not supported
。
还有其他选择吗?
您应该研究 Source.queue
作为一种以背压感知方式从外部将元素注入流的方法。
Source.queue
将具体化为一个队列对象,您可以向其提供元素,但是当您提供它们时,您将返回一个 Future
,该对象在流准备好接受消息时完成。
示例如下:
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (queue, future): (SourceQueueWithComplete[Int], Future[String]) =
Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
Future.sequence(Seq(
queue.offer(1),
queue.offer(2),
queue.offer(3),
queue.offer(4)
))
queue.complete()
val result = Await.result(future, 10.seconds)
println(result)
docs 中的更多信息。
这是来自 akka documentation
的代码片段val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 3.seconds)
assert(result == "123")
这是一个有效的代码片段,但是,如果我使用 ref 来告诉另一条消息,如 ref ! 4
,我会得到一个异常,如 akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)
我想缓冲区大小 3 应该足够了。原因是折叠操作是(acc, ele) => acc,所以它需要累加器和元素到return新值累加器。
所以我更改了代码让另一个演员告诉等待 3 秒。它又开始工作了。
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
Thread.sleep(3000)
ref ! 4
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 10.seconds)
println(result)
但是,我的问题是有没有一种方法可以告诉 Akka 流放慢速度或等待接收器可用。我也在用OverflowStrategy.backpressure
,但是它说Backpressure overflowStrategy not supported
。
还有其他选择吗?
您应该研究 Source.queue
作为一种以背压感知方式从外部将元素注入流的方法。
Source.queue
将具体化为一个队列对象,您可以向其提供元素,但是当您提供它们时,您将返回一个 Future
,该对象在流准备好接受消息时完成。
示例如下:
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (queue, future): (SourceQueueWithComplete[Int], Future[String]) =
Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
Future.sequence(Seq(
queue.offer(1),
queue.offer(2),
queue.offer(3),
queue.offer(4)
))
queue.complete()
val result = Await.result(future, 10.seconds)
println(result)
docs 中的更多信息。