异步和 mapAsync AKKA 流的输出之间的区别
difference between output of async and mapAsync AKKA streams
我比较的是mapAsync
和async
的区别
object Demo3 extends App{
implicit val system = ActorSystem("MyDemo")
implicit val materializer = ActorMaterializer()
private val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
private def test(a:Int) ={
println(s"Flow A : ${Thread.currentThread().getName()}" )
Future(println(a+1))(ec)
}
Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
}
输出
Flow A : MyDemo-akka.actor.default-dispatcher-2
2
Flow A : MyDemo-akka.actor.default-dispatcher-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-2
Flow A : MyDemo-akka.actor.default-dispatcher-2
4
Flow A : MyDemo-akka.actor.default-dispatcher-2
5
Flow A : MyDemo-akka.actor.default-dispatcher-2
6
Flow A : MyDemo-akka.actor.default-dispatcher-2
7
Flow A : MyDemo-akka.actor.default-dispatcher-2
8
Flow A : MyDemo-akka.actor.default-dispatcher-2
9
Flow A : MyDemo-akka.actor.default-dispatcher-2
10
11
为什么尽管并行度为10它显示一个线程名称。不是运行异步吗?
当我用 Source(1 to 100).map(test).async.to(Sink.ignore).run()
行替换它时,
mapAsync
和async
是否每次都使用单线程?
输出
Flow A : MyDemo-akka.actor.default-dispatcher-4
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
9
Flow A : MyDemo-akka.actor.default-dispatcher-4
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
11
在test
中,打印线程ID的println
是在未来之外执行的,因此它是同步执行的。 Future 中的代码将在 ExecutionContext
的线程上执行(在本例中为 actor 系统的调度程序)。值得注意的是发生了一些并行执行:a = 4
的线程打印发生在 a = 3
.
的 a + 1
打印之前
如果将线程 println
移到未来,println
将异步执行:
Future {
println(s"Flow A : ${Thread.currentThread().getName()}")
println(a+1)
}(ec)
请注意,在您的测试代码中,无论如何您都不太可能看到很多并行执行:产生未来所涉及的工作量接近于未来完成的工作量(即使是第二次打印在未来),因此衍生的期货通常在下一个未来可以产生之前完成。
mapAsync
最好被认为是同步调用 returns 未来的代码(未来可能会或可能不会在返回时完成)并将该未来存储在缓冲区中尺寸 parallelism
。当该缓冲区中的未来成功完成时,它完成的值被发出并且缓冲区中的插槽被释放,允许 mapAsync
请求另一个元素(我在技术上描述 mapAsyncUnordered
因为它更简单: mapAsync
在完成之前创建的每个未来都成功完成并发出之前不会发出;我实际上并不知道后面的元素完成是否会在缓冲区中打开一个槽).这是否真的导致并行性取决于未来的细节及其完成方式(例如,如果未来每次都是同一个参与者的请求,则有效的并行性不太可能超过 1)。
在我看来 async
可能应该被称为 stageBoundary
或类似的东西,正是因为它常常使人们认为 mapAsync
和 map(...).async
有很多如果任何共同点。 async
是给 Materializer
的一个信号,即前一个 async
和这个 async
之间的阶段不应与 async
之后的阶段融合。在通常的 ActorMaterializer
中,融合阶段在单个参与者中执行。这样做的好处是消除了将元素从一个阶段转移到另一个阶段的开销,代价是通常将融合阶段中执行的元素数量限制为 1。融合阶段之间有一个隐式缓冲区:下游阶段将发出信号基于其缓冲区中的空插槽的需求。这两个阶段将并行处理,因为 async
之前的(可能融合的)阶段可以在处理元素的同时 async
之后的(可能融合的)阶段正在处理一个元素元素。这基本上是流水线执行。
所以在 Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
中,整个流被具体化为单个 actor,其中(实际上:这是一个符合流如何具体化要求的描述)在单个 actor 中(因此所有这些,除了计划在 ExecutionContext
上的任务外,按顺序同步执行):
Sink.ignore
向 mapAsync
发出有效的无限需求信号
mapAsync
有 10 个空槽,因此从源请求 10 个元素
source
发出 1
mapAsync
打印当前线程,创建一个 Promise[Unit]
,创建一个类似于 的闭包对象
new ClosureClassName { val a: Int = 1; def run(): Unit = println(a+1) }
并在 ec
上安排一个任务,它将 运行 闭包的 run
方法并完成承诺的未来。 ec
会根据ec
的逻辑将任务调度到一个线程上异步执行;与此同时,我们的演员将未来保存在其缓冲区中(称之为 futureBuffer(0)
)
- 假设当我们保存
futureBuffer(0)
时,它已经完成(使用 ()
,Unit
的单例值),mapAsync
发出 ()
到 Sink.ignore
并清除 futureBuffer(0)
Sink.ignore
,好吧,忽略收到的()
source
现在发出 2
mapAsync
执行如上,只是闭包中有 a = 2
- 这次假设由于线程调度变化无常,
futureBuffer(0)
(现在是a = 2
的未来)还没有完成,所以
source
现在发出 3
mapAsync
执行如上,在闭包中使用 a = 3
,将未来保存为 futureBuffer(1)
- 现在
futureBuffer(0)
和 futureBuffer(1)
都已完成,因此 futureBuffer(0)
的值被发送到 Sink.ignore
并且 futureBuffer(0)
被清除
Sink.ignore
忽略值
futureBuffer(1)
的值被发送到 Sink.ignore
并且 futureBuffer(1)
被清除
Sink.ignore
忽略值
所以通过mapAsync
有一点并行度:实现的并行度本质上是未完成的未来的数量。
对于 Source(1 to 100).map(test).async.to(Sink.ignore).run()
这将具体化为
Actor A (Source.map)
^
| Future[Unit] sent down, demand signal sent up
v
Actor B (Sink.ignore)
假设实体化器设置有一个每个角色 2 个元素的接收缓冲区。
- B:
Sink.ignore
向 Actor B
发出有效的无限需求信号
- B:
B
在其缓冲区中有 2 个空闲槽,因此它向 Actor A
发送消息要求 3 个元素
- A:
A
将此需求传递给 map
,后者从源中需求 1 个元素
- A:源发出 1
- A:
map
如上所述打印当前线程等。它不会将结果(可能已完成或未完成)的未来保存在缓冲区中(它没有缓冲区),而是将未来发送到 A
- A:
A
将未来传递给 B
从这里开始,A
和 B
并行处理(至少在这种情况下的某些时间,因为 B
只是将元素发送到位桶)
- A:源发出2;同时
B
将接收到的 future 传递给 Sink.ignore
,后者忽略了 future(甚至不关心 future 是否完成,甚至 future 是否失败)
依此类推...一旦 B 收到三个元素,它将发出信号要求再增加两个(假设 Sink
尚未完成忽略未来和缓冲区2 个元素为空)。
在整个过程中值得注意的是,一个 actor 可能会在消息之间更改它 运行 在哪个线程上(是否这样做取决于 ActorSystem
的调度程序),但是一个actor 保持着一种单线程错觉:它一次只使用一个线程。
您的示例中的计算并行执行 运行,它显示相同线程的原因是因为来自 Source(1 to 10)
的项目被分派给一个演员,整个流是 运行 由。如果您将 test
更改为:
private def test(a: Int) = {
println(s"Flow A : ${Thread.currentThread().getName}")
Future {
println(s"Inside future Flow A : ${Thread.currentThread().getName}")
println(a + 1)
}(ec)
}
你会看到传递给 Future
的代码的计算实际上是在线程池上执行的:
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-1
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-10
11
如果您调整流以添加 async
并在其前后记录:
Source(1 to 10)
.mapAsync(10)(test)
.wireTap(_ => println(s"after mapAsync : ${Thread.currentThread().getName}"))
.async
.wireTap(_ => println(s"after async : ${Thread.currentThread().getName}"))
.to(Sink.ignore)
.run()
您可以观察到并行执行结果是如何由同一个 akka 线程分派的,而且 async
在流中引入了异步边界:
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-1
2
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
Flow A : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-10
11
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after async : MyDemo-akka.actor.default-dispatcher-4
我比较的是mapAsync
和async
object Demo3 extends App{
implicit val system = ActorSystem("MyDemo")
implicit val materializer = ActorMaterializer()
private val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
private def test(a:Int) ={
println(s"Flow A : ${Thread.currentThread().getName()}" )
Future(println(a+1))(ec)
}
Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
}
输出
Flow A : MyDemo-akka.actor.default-dispatcher-2
2
Flow A : MyDemo-akka.actor.default-dispatcher-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-2
Flow A : MyDemo-akka.actor.default-dispatcher-2
4
Flow A : MyDemo-akka.actor.default-dispatcher-2
5
Flow A : MyDemo-akka.actor.default-dispatcher-2
6
Flow A : MyDemo-akka.actor.default-dispatcher-2
7
Flow A : MyDemo-akka.actor.default-dispatcher-2
8
Flow A : MyDemo-akka.actor.default-dispatcher-2
9
Flow A : MyDemo-akka.actor.default-dispatcher-2
10
11
为什么尽管并行度为10它显示一个线程名称。不是运行异步吗?
当我用 Source(1 to 100).map(test).async.to(Sink.ignore).run()
行替换它时,
mapAsync
和async
是否每次都使用单线程?
输出
Flow A : MyDemo-akka.actor.default-dispatcher-4
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
9
Flow A : MyDemo-akka.actor.default-dispatcher-4
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
11
在test
中,打印线程ID的println
是在未来之外执行的,因此它是同步执行的。 Future 中的代码将在 ExecutionContext
的线程上执行(在本例中为 actor 系统的调度程序)。值得注意的是发生了一些并行执行:a = 4
的线程打印发生在 a = 3
.
a + 1
打印之前
如果将线程 println
移到未来,println
将异步执行:
Future {
println(s"Flow A : ${Thread.currentThread().getName()}")
println(a+1)
}(ec)
请注意,在您的测试代码中,无论如何您都不太可能看到很多并行执行:产生未来所涉及的工作量接近于未来完成的工作量(即使是第二次打印在未来),因此衍生的期货通常在下一个未来可以产生之前完成。
mapAsync
最好被认为是同步调用 returns 未来的代码(未来可能会或可能不会在返回时完成)并将该未来存储在缓冲区中尺寸 parallelism
。当该缓冲区中的未来成功完成时,它完成的值被发出并且缓冲区中的插槽被释放,允许 mapAsync
请求另一个元素(我在技术上描述 mapAsyncUnordered
因为它更简单: mapAsync
在完成之前创建的每个未来都成功完成并发出之前不会发出;我实际上并不知道后面的元素完成是否会在缓冲区中打开一个槽).这是否真的导致并行性取决于未来的细节及其完成方式(例如,如果未来每次都是同一个参与者的请求,则有效的并行性不太可能超过 1)。
async
可能应该被称为 stageBoundary
或类似的东西,正是因为它常常使人们认为 mapAsync
和 map(...).async
有很多如果任何共同点。 async
是给 Materializer
的一个信号,即前一个 async
和这个 async
之间的阶段不应与 async
之后的阶段融合。在通常的 ActorMaterializer
中,融合阶段在单个参与者中执行。这样做的好处是消除了将元素从一个阶段转移到另一个阶段的开销,代价是通常将融合阶段中执行的元素数量限制为 1。融合阶段之间有一个隐式缓冲区:下游阶段将发出信号基于其缓冲区中的空插槽的需求。这两个阶段将并行处理,因为 async
之前的(可能融合的)阶段可以在处理元素的同时 async
之后的(可能融合的)阶段正在处理一个元素元素。这基本上是流水线执行。
所以在 Source(1 to 10).mapAsync(10)(test).to(Sink.ignore).run()
中,整个流被具体化为单个 actor,其中(实际上:这是一个符合流如何具体化要求的描述)在单个 actor 中(因此所有这些,除了计划在 ExecutionContext
上的任务外,按顺序同步执行):
Sink.ignore
向mapAsync
发出有效的无限需求信号
mapAsync
有 10 个空槽,因此从源请求 10 个元素source
发出 1mapAsync
打印当前线程,创建一个Promise[Unit]
,创建一个类似于 的闭包对象
new ClosureClassName { val a: Int = 1; def run(): Unit = println(a+1) }
并在 ec
上安排一个任务,它将 运行 闭包的 run
方法并完成承诺的未来。 ec
会根据ec
的逻辑将任务调度到一个线程上异步执行;与此同时,我们的演员将未来保存在其缓冲区中(称之为 futureBuffer(0)
)
- 假设当我们保存
futureBuffer(0)
时,它已经完成(使用()
,Unit
的单例值),mapAsync
发出()
到Sink.ignore
并清除futureBuffer(0)
Sink.ignore
,好吧,忽略收到的()
source
现在发出 2mapAsync
执行如上,只是闭包中有a = 2
- 这次假设由于线程调度变化无常,
futureBuffer(0)
(现在是a = 2
的未来)还没有完成,所以 source
现在发出 3mapAsync
执行如上,在闭包中使用a = 3
,将未来保存为futureBuffer(1)
- 现在
futureBuffer(0)
和futureBuffer(1)
都已完成,因此futureBuffer(0)
的值被发送到Sink.ignore
并且futureBuffer(0)
被清除 Sink.ignore
忽略值futureBuffer(1)
的值被发送到Sink.ignore
并且futureBuffer(1)
被清除Sink.ignore
忽略值
所以通过mapAsync
有一点并行度:实现的并行度本质上是未完成的未来的数量。
对于 Source(1 to 100).map(test).async.to(Sink.ignore).run()
这将具体化为
Actor A (Source.map)
^
| Future[Unit] sent down, demand signal sent up
v
Actor B (Sink.ignore)
假设实体化器设置有一个每个角色 2 个元素的接收缓冲区。
- B:
Sink.ignore
向Actor B
发出有效的无限需求信号
- B:
B
在其缓冲区中有 2 个空闲槽,因此它向Actor A
发送消息要求 3 个元素 - A:
A
将此需求传递给map
,后者从源中需求 1 个元素 - A:源发出 1
- A:
map
如上所述打印当前线程等。它不会将结果(可能已完成或未完成)的未来保存在缓冲区中(它没有缓冲区),而是将未来发送到A
- A:
A
将未来传递给B
从这里开始,A
和 B
并行处理(至少在这种情况下的某些时间,因为 B
只是将元素发送到位桶)
- A:源发出2;同时
B
将接收到的 future 传递给Sink.ignore
,后者忽略了 future(甚至不关心 future 是否完成,甚至 future 是否失败)
依此类推...一旦 B 收到三个元素,它将发出信号要求再增加两个(假设 Sink
尚未完成忽略未来和缓冲区2 个元素为空)。
在整个过程中值得注意的是,一个 actor 可能会在消息之间更改它 运行 在哪个线程上(是否这样做取决于 ActorSystem
的调度程序),但是一个actor 保持着一种单线程错觉:它一次只使用一个线程。
您的示例中的计算并行执行 运行,它显示相同线程的原因是因为来自 Source(1 to 10)
的项目被分派给一个演员,整个流是 运行 由。如果您将 test
更改为:
private def test(a: Int) = {
println(s"Flow A : ${Thread.currentThread().getName}")
Future {
println(s"Inside future Flow A : ${Thread.currentThread().getName}")
println(a + 1)
}(ec)
}
你会看到传递给 Future
的代码的计算实际上是在线程池上执行的:
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-1
2
Flow A : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-4
Inside future Flow A : pool-1-thread-10
11
如果您调整流以添加 async
并在其前后记录:
Source(1 to 10)
.mapAsync(10)(test)
.wireTap(_ => println(s"after mapAsync : ${Thread.currentThread().getName}"))
.async
.wireTap(_ => println(s"after async : ${Thread.currentThread().getName}"))
.to(Sink.ignore)
.run()
您可以观察到并行执行结果是如何由同一个 akka 线程分派的,而且 async
在流中引入了异步边界:
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-1
2
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
Flow A : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-2
3
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-3
4
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-4
5
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-5
6
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-6
7
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-7
8
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-8
9
Inside future Flow A : pool-1-thread-9
10
Flow A : MyDemo-akka.actor.default-dispatcher-5
Inside future Flow A : pool-1-thread-10
11
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after mapAsync : MyDemo-akka.actor.default-dispatcher-5
after async : MyDemo-akka.actor.default-dispatcher-4
after async : MyDemo-akka.actor.default-dispatcher-4