异步和 mapAsync AKKA 流的输出之间的区别

difference between output of async and mapAsync AKKA streams

我比较的是mapAsyncasync

的区别
object Demo3 extends App{

  implicit val system       = ActorSystem("MyDemo")
  implicit val materializer = ActorMaterializer()
  private val ec             = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  private def test(a:Int) ={
    println(s"Flow A : ${Thread.currentThread().getName()}" )
     Future(println(a+1))(ec)
  }

  Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()

}

输出

    Flow A : MyDemo-akka.actor.default-dispatcher-2
    2
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    3
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    4
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    5
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    6
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    7
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    8
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    9
    Flow A : MyDemo-akka.actor.default-dispatcher-2
    10
    11

为什么尽管并行度为10它显示一个线程名称。不是运行异步吗? 当我用 Source(1 to 100).map(test).async.to(Sink.ignore).run() 行替换它时, mapAsyncasync是否每次都使用单线程?

输出

    Flow A : MyDemo-akka.actor.default-dispatcher-4
    2
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    3
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    4
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    5
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    6
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    7
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    8
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    9
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    10
    Flow A : MyDemo-akka.actor.default-dispatcher-4
    11

test中,打印线程ID的println是在未来之外执行的,因此它是同步执行的。 Future 中的代码将在 ExecutionContext 的线程上执行(在本例中为 actor 系统的调度程序)。值得注意的是发生了一些并行执行:a = 4 的线程打印发生在 a = 3.

a + 1 打印之前

如果将线程 println 移到未来,println 将异步执行:

Future {
  println(s"Flow A : ${Thread.currentThread().getName()}")
  println(a+1)
}(ec)

请注意,在您的测试代码中,无论如何您都不太可能看到很多并行执行:产生未来所涉及的工作量接近于未来完成的工作量(即使是第二次打印在未来),因此衍生的期货通常在下一个未来可以产生之前完成。

mapAsync 最好被认为是同步调用 returns 未来的代码(未来可能会或可能不会在返回时完成)并将该未来存储在缓冲区中尺寸 parallelism。当该缓冲区中的未来成功完成时,它完成的值被发出并且缓冲区中的插槽被释放,允许 mapAsync 请求另一个元素(我在技术上描述 mapAsyncUnordered 因为它更简单: mapAsync 在完成之前创建的每个未来都成功完成并发出之前不会发出;我实际上并不知道后面的元素完成是否会在缓冲区中打开一个槽).这是否真的导致并行性取决于未来的细节及其完成方式(例如,如果未来每次都是同一个参与者的请求,则有效的并行性不太可能超过 1)。

在我看来

async 可能应该被称为 stageBoundary 或类似的东西,正是因为它常常使人们认为 mapAsyncmap(...).async 有很多如果任何共同点。 async 是给 Materializer 的一个信号,即前一个 async 和这个 async 之间的阶段不应与 async 之后的阶段融合。在通常的 ActorMaterializer 中,融合阶段在单个参与者中执行。这样做的好处是消除了将元素从一个阶段转移到另一个阶段的开销,代价是通常将融合阶段中执行的元素数量限制为 1。融合阶段之间有一个隐式缓冲区:下游阶段将发出信号基于其缓冲区中的空插槽的需求。这两个阶段将并行处理,因为 async 之前的(可能融合的)阶段可以在处理元素的同时 async 之后的(可能融合的)阶段正在处理一个元素元素。这基本上是流水线执行。

所以在 Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run() 中,整个流被具体化为单个 actor,其中(实际上:这是一个符合流如何具体化要求的描述)在单个 actor 中(因此所有这些,除了计划在 ExecutionContext 上的任务外,按顺序同步执行):

  • Sink.ignoremapAsync
  • 发出有效的无限需求信号
  • mapAsync 有 10 个空槽,因此从源请求 10 个元素
  • source 发出 1
  • mapAsync 打印当前线程,创建一个 Promise[Unit],创建一个类似于
  • 的闭包对象
new ClosureClassName { val a: Int = 1; def run(): Unit = println(a+1) }

并在 ec 上安排一个任务,它将 运行 闭包的 run 方法并完成承诺的未来。 ec会根据ec的逻辑将任务调度到一个线程上异步执行;与此同时,我们的演员将未来保存在其缓冲区中(称之为 futureBuffer(0)

  • 假设当我们保存 futureBuffer(0) 时,它已经完成(使用 ()Unit 的单例值),mapAsync 发出 ()Sink.ignore 并清除 futureBuffer(0)
  • Sink.ignore,好吧,忽略收到的()
  • source 现在发出 2
  • mapAsync 执行如上,只是闭包中有 a = 2
  • 这次假设由于线程调度变化无常,futureBuffer(0)(现在是a = 2的未来)还没有完成,所以
  • source 现在发出 3
  • mapAsync 执行如上,在闭包中使用 a = 3,将未来保存为 futureBuffer(1)
  • 现在 futureBuffer(0)futureBuffer(1) 都已完成,因此 futureBuffer(0) 的值被发送到 Sink.ignore 并且 futureBuffer(0) 被清除
  • Sink.ignore忽略值
  • futureBuffer(1) 的值被发送到 Sink.ignore 并且 futureBuffer(1) 被清除
  • Sink.ignore忽略值

所以通过mapAsync有一点并行度:实现的并行度本质上是未完成的未来的数量。

对于 Source(1 to 100).map(test).async.to(Sink.ignore).run() 这将具体化为

Actor A (Source.map)
  ^
  |      Future[Unit] sent down, demand signal sent up
  v
Actor B (Sink.ignore)

假设实体化器设置有一个每个角色 2 个元素的接收缓冲区。

  • B:Sink.ignoreActor B
  • 发出有效的无限需求信号
  • B: B 在其缓冲区中有 2 个空闲槽,因此它向 Actor A 发送消息要求 3 个元素
  • A:A 将此需求传递给 map,后者从源中需求 1 个元素
  • A:源发出 1
  • A: map 如上所述打印当前线程等。它不会将结果(可能已完成或未完成)的未来保存在缓冲区中(它没有缓冲区),而是将未来发送到 A
  • A: A 将未来传递给 B

从这里开始,AB 并行处理(至少在这种情况下的某些时间,因为 B 只是将元素发送到位桶)

  • A:源发出2;同时 B 将接收到的 future 传递给 Sink.ignore,后者忽略了 future(甚至不关心 future 是否完成,甚至 future 是否失败)

依此类推...一旦 B 收到三个元素,它将发出信号要求再增加两个(假设 Sink 尚未完成忽略未来和缓冲区2 个元素为空)。

在整个过程中值得注意的是,一个 actor 可能会在消息之间更改它 运行 在哪个线程上(是否这样做取决于 ActorSystem 的调度程序),但是一个actor 保持着一种单线程错觉:它一次只使用一个线程。

您的示例中的计算并行执行 运行,它显示相同线程的原因是因为来自 Source(1 to 10) 的项目被分派给一个演员,整个流是 运行 由。如果您将 test 更改为:

private def test(a: Int) = {
    println(s"Flow A : ${Thread.currentThread().getName}")
    Future {
      println(s"Inside future Flow A : ${Thread.currentThread().getName}")
      println(a + 1)
    }(ec)
  }

你会看到传递给 Future 的代码的计算实际上是在线程池上执行的:

Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-1
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-10
11

如果您调整流以添加 async 并在其前后记录:

Source(1 to 10)
    .mapAsync(10)(test)
    .wireTap(_ => println(s"after mapAsync : ${Thread.currentThread().getName}"))
    .async
    .wireTap(_ => println(s"after async : ${Thread.currentThread().getName}"))
    .to(Sink.ignore)
    .run()

您可以观察到并行执行结果是如何由同一个 akka 线程分派的,而且 async 在流中引入了异步边界:

Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-1
2
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
Flow A : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-10
11
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after async : MyDemo-akka.actor.default-dispatcher-4