Akka Streams 运行 异步流动

Akka Streams run flow asynchronously

我已经测试了异步运行的简单异步流,但我很惊讶它不是。我需要一些额外的配置吗?

@Configuration
class StreamingConfiguration
{
 
    @Bean
    Materializer materializer(ActorSystem actorSystem)
    {
        return ActorMaterializer.create(actorSystem);
    }

    @PostConstruct
    public void test(Materializer materializer)
    {
        var takePart = Flow.of(String.class).map(path -> {
            var start = System.currentTimeMillis();
            while (System.currentTimeMillis() - start < 3000) {}
            return path;
        });

        Source.from(Lists.newArrayList("A", "B", "C", "D"))
            .via(takePart.async())
            .toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
            .run(materializer)
            .toCompletableFuture()
            .join();
    }
}

我可以看到 materializer 有默认的 fork-join-pool 调度程序

编辑:抱歉,您的示例也不起作用。使用 mapAsync 仍然需要 12~ 秒才能完成。我尝试 flatMapMerge 结果相同:/

   Function<String, CompletionStage<String>> blocking = s -> {
            try
            {
                Thread.sleep(3000);

            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            return CompletableFuture.completedFuture(s);
        };


        Source.from(List.of("A", "B", "C", "D"))
                .mapAsync(4, blocking)
                .toMat(Sink.fold("", (arg1, arg2) -> arg1), Keep.right())
                .run(actorSystem)
                .toCompletableFuture()
                .join();

Akka Streams 默认情况下将流阶段具体化为单个参与者:这避免了在流阶段之间传递消息的开销,但这确实意味着流的第二个元素在第一个元素完成其工作之前不会被消耗通过流的方式。

流中的 async 运算符意味着到此为止的流将在其自己的 actor 中执行。在您的示例代码中:

  • Source将成为演员
  • takePart 流将是一个演员
  • Sink将成为演员

这些中的每一个仍然不允许同时处理一个以上的元素:没有 async 的好处是 SourceSink 可以有一个正在处理的元素与 takePart 同时有一个正在处理的元素。下游阶段也有一个小的隐式缓冲区来提高吞吐量,但这通常会被忽略。

在此流中,takePart 阶段处理一个元素需要 3 秒,而 SourceSink 需要几微秒(为了便于说明,我们将假设 Source 需要 5 微秒,而 Sink 需要 15 微秒)。所以粗略的时间顺序是(忽略缓冲区):

  • 时间 0:takePartSource
  • 发出需求信号
  • 时间 5 us:Source 将 A 发射到 takePart
  • 时间 3 秒 + 5 us:takePartSink 发送 A,向 Source
  • 发出需求信号
  • 时间 3 秒 + 10 us:Source 将 B 发射到 takePart
  • 时间 3 秒 + 20 us:Sink 处理 A,向 takePart
  • 发出需求信号
  • 时间 6 秒 + 10 us:takePart 将 B 发送到 Sink,向 Source
  • 发出需求信号
  • 时间 6 秒 + 15 us:Source 将 C 发射到 takePart
  • 时间 6 秒 + 25 us:Sink 处理 B,向 takePart
  • 发出需求信号
  • 时间 9 秒 + 15 us:takePartSink 发出 C,向 Source
  • 发出需求信号
  • 时间 9 秒 + 20 us:Source 将 D 发射到 takePart
  • 时间 9 秒 + 30 us:Sink 处理 C,向 takePart
  • 发出需求信号
  • 时间 12 秒 + 20 us:takePartSink 发送 D,向 Source 发出请求信号,Source 完成,takePart 完成
  • 时间 12 秒 + 35 us:Sink 处理 D,完成

如果没有 async,流将在 4 * (3 sec + 20 us) 内完成,因此 async 节省了 45 us(累计,此流中的 async 将节省 15 us第一个之后的每个元素),所以收益不大。充分利用的管道流的吞吐量由最慢的部分控制(您可以想象一条限速下降的高速公路:如果交通量大到使高速公路饱和,限速下降之前高速公路上的速度将是速度下降后的限制):如果 async 的每一侧以大致相同的速率处理元素,您将获得最佳结果。

Akka Streams API 中“异步”的另一种用法有点令人困惑,用于表示通过获取 Futures (Scala) 或 CompletionStages (Java):完成Future/CompletionStage的进程可能运行在不同的线程上,stream stage通常包含一些对数量的限制Futures/CompletionStages 允许一次飞行。 mapAsync 就是一个例子。

在 Scala 中(我通常不熟悉 Java 未来的 APIs),这类似于(忽略设置隐式 ExecutionContext 等):

def blockOnElement(e: String): Future[String] = Future {
  Thread.sleep(3000)
  e
}

Source(List("A", "B", "C", "D"))
  .mapAsync(4)(blockOnElement)
  .runWith(Sink.fold("") { (acc, _) => acc })

在那里,假设调度程序中有足够的(超过 4 个)线程,整个流应该在大约 3 秒和 80 us(SourceSink 仍然会在每个元素上花费 20 us。

除了@Alec 提到的 flatMapMerge 之外,通过使用 Source.singleSink.head 运行 mapAsync 中的子流通常很有用:接收器的物化值将是输出元素的 Future/CompletionStage,而 mapAsync 将依次保留下游排序(与 flatMapMerge 相反)。