无法使用 Akka 流获得正确分块的响应

Cannot get properly chunked response using Akka streams

使用 Play! Framework documentation 中提供的示例,我创建了一个修改版本,它发布了一些延迟事件,因此我可以观察到它们在客户端以相同的速率发生:

public Result playExampleDelayed() {
    Source<ByteString, ?> source = Source.<ByteString> actorRef(5, OverflowStrategy.dropNew())
            .mapMaterializedValue(sourceActor -> {
                for (int i = 0; i < 10; ++i) {
                    Thread.sleep(1000);
                    sourceActor.tell(ByteString.fromString("tick " + i), null);
                }
                sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
                return null;
            });
    return ok().chunked(source);
}

但是,使用 curl,当源完成时,我们一步获取所有事件。

使用不同的源类型我可以获得预期的行为:

public Result tick() {
    Source<ByteString, ?> source = Source.<ByteString> tick(Duration.create(0, TimeUnit.SECONDS),
            Duration.create(1, TimeUnit.SECONDS), ByteString.fromString("tick"));
    return ok().chunked(source);
}

在这种情况下,我将每秒在控制台中获取一个块。

根据 Akka 文档,我希望第一个示例能够工作。我做错了什么?

您正在 mapMaterializedValue 通话中使用 Thread.sleep。这是在您 run() 您的流之后立即发生的同步调用。在此处阻塞(例如使用 Thread.sleep)将阻塞整个实体化。因此,所有消息都将在循环执行的最后被参与者拾取。

底线:在使用 Akka 时始终避免使用 Thread.sleep

相反,Source.tick 使用调度程序(异步、非阻塞),因此是一种性能更高、更健壮和更优雅的解决方案。