无法使用 Akka 流获得正确分块的响应
Cannot get properly chunked response using Akka streams
使用 Play! Framework documentation 中提供的示例,我创建了一个修改版本,它发布了一些延迟事件,因此我可以观察到它们在客户端以相同的速率发生:
public Result playExampleDelayed() {
Source<ByteString, ?> source = Source.<ByteString> actorRef(5, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
for (int i = 0; i < 10; ++i) {
Thread.sleep(1000);
sourceActor.tell(ByteString.fromString("tick " + i), null);
}
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
return null;
});
return ok().chunked(source);
}
但是,使用 curl
,当源完成时,我们一步获取所有事件。
使用不同的源类型我可以获得预期的行为:
public Result tick() {
Source<ByteString, ?> source = Source.<ByteString> tick(Duration.create(0, TimeUnit.SECONDS),
Duration.create(1, TimeUnit.SECONDS), ByteString.fromString("tick"));
return ok().chunked(source);
}
在这种情况下,我将每秒在控制台中获取一个块。
根据 Akka 文档,我希望第一个示例能够工作。我做错了什么?
您正在 mapMaterializedValue
通话中使用 Thread.sleep
。这是在您 run()
您的流之后立即发生的同步调用。在此处阻塞(例如使用 Thread.sleep
)将阻塞整个实体化。因此,所有消息都将在循环执行的最后被参与者拾取。
底线:在使用 Akka 时始终避免使用 Thread.sleep
。
相反,Source.tick
使用调度程序(异步、非阻塞),因此是一种性能更高、更健壮和更优雅的解决方案。
使用 Play! Framework documentation 中提供的示例,我创建了一个修改版本,它发布了一些延迟事件,因此我可以观察到它们在客户端以相同的速率发生:
public Result playExampleDelayed() {
Source<ByteString, ?> source = Source.<ByteString> actorRef(5, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
for (int i = 0; i < 10; ++i) {
Thread.sleep(1000);
sourceActor.tell(ByteString.fromString("tick " + i), null);
}
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
return null;
});
return ok().chunked(source);
}
但是,使用 curl
,当源完成时,我们一步获取所有事件。
使用不同的源类型我可以获得预期的行为:
public Result tick() {
Source<ByteString, ?> source = Source.<ByteString> tick(Duration.create(0, TimeUnit.SECONDS),
Duration.create(1, TimeUnit.SECONDS), ByteString.fromString("tick"));
return ok().chunked(source);
}
在这种情况下,我将每秒在控制台中获取一个块。
根据 Akka 文档,我希望第一个示例能够工作。我做错了什么?
您正在 mapMaterializedValue
通话中使用 Thread.sleep
。这是在您 run()
您的流之后立即发生的同步调用。在此处阻塞(例如使用 Thread.sleep
)将阻塞整个实体化。因此,所有消息都将在循环执行的最后被参与者拾取。
底线:在使用 Akka 时始终避免使用 Thread.sleep
。
相反,Source.tick
使用调度程序(异步、非阻塞),因此是一种性能更高、更健壮和更优雅的解决方案。