Akka Streams 运行 异步流动
Akka Streams run flow asynchronously
我已经测试了异步运行的简单异步流,但我很惊讶它不是。我需要一些额外的配置吗?
@Configuration
class StreamingConfiguration
{
@Bean
Materializer materializer(ActorSystem actorSystem)
{
return ActorMaterializer.create(actorSystem);
}
@PostConstruct
public void test(Materializer materializer)
{
var takePart = Flow.of(String.class).map(path -> {
var start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 3000) {}
return path;
});
Source.from(Lists.newArrayList("A", "B", "C", "D"))
.via(takePart.async())
.toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
.run(materializer)
.toCompletableFuture()
.join();
}
}
我可以看到 materializer 有默认的 fork-join-pool 调度程序
编辑:抱歉,您的示例也不起作用。使用 mapAsync
仍然需要 12~ 秒才能完成。我尝试 flatMapMerge
结果相同:/
Function<String, CompletionStage<String>> blocking = s -> {
try
{
Thread.sleep(3000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
return CompletableFuture.completedFuture(s);
};
Source.from(List.of("A", "B", "C", "D"))
.mapAsync(4, blocking)
.toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
.run(actorSystem)
.toCompletableFuture()
.join();
Akka Streams 默认情况下将流阶段具体化为单个参与者:这避免了在流阶段之间传递消息的开销,但这确实意味着流的第二个元素在第一个元素完成其工作之前不会被消耗通过流的方式。
流中的 async
运算符意味着到此为止的流将在其自己的 actor 中执行。在您的示例代码中:
Source
将成为演员
takePart
流将是一个演员
Sink
将成为演员
这些中的每一个仍然不允许同时处理一个以上的元素:没有 async
的好处是 Source
和 Sink
可以有一个正在处理的元素与 takePart
同时有一个正在处理的元素。下游阶段也有一个小的隐式缓冲区来提高吞吐量,但这通常会被忽略。
在此流中,takePart
阶段处理一个元素需要 3 秒,而 Source
和 Sink
需要几微秒(为了便于说明,我们将假设 Source
需要 5 微秒,而 Sink
需要 15 微秒)。所以粗略的时间顺序是(忽略缓冲区):
- 时间 0:
takePart
向 Source
发出需求信号
- 时间 5 us:
Source
将 A 发射到 takePart
- 时间 3 秒 + 5 us:
takePart
向 Sink
发送 A,向 Source
发出需求信号
- 时间 3 秒 + 10 us:
Source
将 B 发射到 takePart
- 时间 3 秒 + 20 us:
Sink
处理 A,向 takePart
发出需求信号
- 时间 6 秒 + 10 us:
takePart
将 B 发送到 Sink
,向 Source
发出需求信号
- 时间 6 秒 + 15 us:
Source
将 C 发射到 takePart
- 时间 6 秒 + 25 us:
Sink
处理 B,向 takePart
发出需求信号
- 时间 9 秒 + 15 us:
takePart
向 Sink
发出 C,向 Source
发出需求信号
- 时间 9 秒 + 20 us:
Source
将 D 发射到 takePart
- 时间 9 秒 + 30 us:
Sink
处理 C,向 takePart
发出需求信号
- 时间 12 秒 + 20 us:
takePart
向 Sink
发送 D,向 Source
发出请求信号,Source
完成,takePart
完成
- 时间 12 秒 + 35 us:
Sink
处理 D,完成
如果没有 async
,流将在 4 * (3 sec + 20 us)
内完成,因此 async
节省了 45 us(累计,此流中的 async
将节省 15 us第一个之后的每个元素),所以收益不大。充分利用的管道流的吞吐量由最慢的部分控制(您可以想象一条限速下降的高速公路:如果交通量大到使高速公路饱和,限速下降之前高速公路上的速度将是速度下降后的限制):如果 async
的每一侧以大致相同的速率处理元素,您将获得最佳结果。
Akka Streams API 中“异步”的另一种用法有点令人困惑,用于表示通过获取 Future
s (Scala) 或 CompletionStage
s (Java):完成Future
/CompletionStage
的进程可能运行在不同的线程上,stream stage通常包含一些对数量的限制Future
s/CompletionStage
s 允许一次飞行。 mapAsync
就是一个例子。
在 Scala 中(我通常不熟悉 Java 未来的 APIs),这类似于(忽略设置隐式 ExecutionContext
等):
def blockOnElement(e: String): Future[String] = Future {
Thread.sleep(3000)
e
}
Source(List("A", "B", "C", "D"))
.mapAsync(4)(blockOnElement)
.runWith(Sink.fold("") { (acc, _) => acc })
在那里,假设调度程序中有足够的(超过 4 个)线程,整个流应该在大约 3 秒和 80 us(Source
和Sink
仍然会在每个元素上花费 20 us。
除了@Alec 提到的 flatMapMerge
之外,通过使用 Source.single
和 Sink.head
运行 mapAsync
中的子流通常很有用:接收器的物化值将是输出元素的 Future
/CompletionStage
,而 mapAsync
将依次保留下游排序(与 flatMapMerge
相反)。
我已经测试了异步运行的简单异步流,但我很惊讶它不是。我需要一些额外的配置吗?
@Configuration
class StreamingConfiguration
{
@Bean
Materializer materializer(ActorSystem actorSystem)
{
return ActorMaterializer.create(actorSystem);
}
@PostConstruct
public void test(Materializer materializer)
{
var takePart = Flow.of(String.class).map(path -> {
var start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 3000) {}
return path;
});
Source.from(Lists.newArrayList("A", "B", "C", "D"))
.via(takePart.async())
.toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
.run(materializer)
.toCompletableFuture()
.join();
}
}
我可以看到 materializer 有默认的 fork-join-pool 调度程序
编辑:抱歉,您的示例也不起作用。使用 mapAsync
仍然需要 12~ 秒才能完成。我尝试 flatMapMerge
结果相同:/
Function<String, CompletionStage<String>> blocking = s -> {
try
{
Thread.sleep(3000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
return CompletableFuture.completedFuture(s);
};
Source.from(List.of("A", "B", "C", "D"))
.mapAsync(4, blocking)
.toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
.run(actorSystem)
.toCompletableFuture()
.join();
Akka Streams 默认情况下将流阶段具体化为单个参与者:这避免了在流阶段之间传递消息的开销,但这确实意味着流的第二个元素在第一个元素完成其工作之前不会被消耗通过流的方式。
流中的 async
运算符意味着到此为止的流将在其自己的 actor 中执行。在您的示例代码中:
Source
将成为演员takePart
流将是一个演员Sink
将成为演员
这些中的每一个仍然不允许同时处理一个以上的元素:没有 async
的好处是 Source
和 Sink
可以有一个正在处理的元素与 takePart
同时有一个正在处理的元素。下游阶段也有一个小的隐式缓冲区来提高吞吐量,但这通常会被忽略。
在此流中,takePart
阶段处理一个元素需要 3 秒,而 Source
和 Sink
需要几微秒(为了便于说明,我们将假设 Source
需要 5 微秒,而 Sink
需要 15 微秒)。所以粗略的时间顺序是(忽略缓冲区):
- 时间 0:
takePart
向Source
发出需求信号
- 时间 5 us:
Source
将 A 发射到takePart
- 时间 3 秒 + 5 us:
takePart
向Sink
发送 A,向Source
发出需求信号
- 时间 3 秒 + 10 us:
Source
将 B 发射到takePart
- 时间 3 秒 + 20 us:
Sink
处理 A,向takePart
发出需求信号
- 时间 6 秒 + 10 us:
takePart
将 B 发送到Sink
,向Source
发出需求信号
- 时间 6 秒 + 15 us:
Source
将 C 发射到takePart
- 时间 6 秒 + 25 us:
Sink
处理 B,向takePart
发出需求信号
- 时间 9 秒 + 15 us:
takePart
向Sink
发出 C,向Source
发出需求信号
- 时间 9 秒 + 20 us:
Source
将 D 发射到takePart
- 时间 9 秒 + 30 us:
Sink
处理 C,向takePart
发出需求信号
- 时间 12 秒 + 20 us:
takePart
向Sink
发送 D,向Source
发出请求信号,Source
完成,takePart
完成 - 时间 12 秒 + 35 us:
Sink
处理 D,完成
如果没有 async
,流将在 4 * (3 sec + 20 us)
内完成,因此 async
节省了 45 us(累计,此流中的 async
将节省 15 us第一个之后的每个元素),所以收益不大。充分利用的管道流的吞吐量由最慢的部分控制(您可以想象一条限速下降的高速公路:如果交通量大到使高速公路饱和,限速下降之前高速公路上的速度将是速度下降后的限制):如果 async
的每一侧以大致相同的速率处理元素,您将获得最佳结果。
Akka Streams API 中“异步”的另一种用法有点令人困惑,用于表示通过获取 Future
s (Scala) 或 CompletionStage
s (Java):完成Future
/CompletionStage
的进程可能运行在不同的线程上,stream stage通常包含一些对数量的限制Future
s/CompletionStage
s 允许一次飞行。 mapAsync
就是一个例子。
在 Scala 中(我通常不熟悉 Java 未来的 APIs),这类似于(忽略设置隐式 ExecutionContext
等):
def blockOnElement(e: String): Future[String] = Future {
Thread.sleep(3000)
e
}
Source(List("A", "B", "C", "D"))
.mapAsync(4)(blockOnElement)
.runWith(Sink.fold("") { (acc, _) => acc })
在那里,假设调度程序中有足够的(超过 4 个)线程,整个流应该在大约 3 秒和 80 us(Source
和Sink
仍然会在每个元素上花费 20 us。
除了@Alec 提到的 flatMapMerge
之外,通过使用 Source.single
和 Sink.head
运行 mapAsync
中的子流通常很有用:接收器的物化值将是输出元素的 Future
/CompletionStage
,而 mapAsync
将依次保留下游排序(与 flatMapMerge
相反)。