如何从 Akka 中提取实时 FileIO 状态?
How do I extract the real-time FileIO state from Akka?
我正在使用 Akka 制作一个文件传输系统。我一直在看文件一段时间。目前的进度状态是Actor2收到Actor1发送的文件,并写入到Actor2的本地系统(Actor1 = sender, Actor2 = receiver)。
但是我找不到办法知道我在写入时实时收到了多少字节。
我测试了一下,原来用runWith
API,可以在本地写文件。使用runForeach
API,实时传递了多少字节。但是,如果这两个同时创建,则无法写入文件。
这是我的简单来源。请给我一些建议。
public static Behavior<Command> create() {
return Behaviors.setup(context -> {
context.getLog().info("Registering myself with receptionist");
context.getSystem().receptionist().tell(Receptionist.register(RECEIVER_SERVICE_KEY, context.getSelf().narrow()));
Materializer mat = Materializer.createMaterializer(context);
return Behaviors.receive(Command.class)
.onMessage(TransferFile.class, command -> {
command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.pptx")), mat);
//command.replyTo.tell(new FileTransfered("filename", 1024));
command.sourceRef.getSource().runForeach(f -> System.out.println(f.size()), mat);
return Behaviors.same();
}).build();
});
}
使用 BroadcastHub
允许多个消费者使用您的 Source
:
Source<ByteString, NotUsed> fileSource = command.sourceRef.getSource();
RunnableGraph<Source<ByteString, NotUsed>> runnableGraph =
fileSource.toMat(BroadcastHub.of(ByteString.class, 256), Keep.right());
// adjust the buffer size (256) as needed
Source<ByteString, NotUsed> fromFileSource = runnableGraph.run(mat);
fromFileSource.runWith(FileIO.toPath(Paths.get("test.pptx")), mat);
fromFileSource.runForeach(f -> System.out.println(f.size()), mat);
BroadcastHub
根据 Jeffrey 的建议,允许将单个 运行 流连接到随时间启动和停止的多个其他流。
拥有动态连接到其他流的流在内部需要相当多的额外环路,因此如果您不需要它,最好避免这种开销。
如果您的用例是您想要使用带有两个接收器的单个源,那么使用 source.alsoTo(sink1).to(sink2)
会更好。
流程 API 中的 alsoTo
由 Broadcast 运算符支持,但直接使用它需要您使用 Graph DSL。
我正在使用 Akka 制作一个文件传输系统。我一直在看文件一段时间。目前的进度状态是Actor2收到Actor1发送的文件,并写入到Actor2的本地系统(Actor1 = sender, Actor2 = receiver)。
但是我找不到办法知道我在写入时实时收到了多少字节。
我测试了一下,原来用runWith
API,可以在本地写文件。使用runForeach
API,实时传递了多少字节。但是,如果这两个同时创建,则无法写入文件。
这是我的简单来源。请给我一些建议。
public static Behavior<Command> create() {
return Behaviors.setup(context -> {
context.getLog().info("Registering myself with receptionist");
context.getSystem().receptionist().tell(Receptionist.register(RECEIVER_SERVICE_KEY, context.getSelf().narrow()));
Materializer mat = Materializer.createMaterializer(context);
return Behaviors.receive(Command.class)
.onMessage(TransferFile.class, command -> {
command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.pptx")), mat);
//command.replyTo.tell(new FileTransfered("filename", 1024));
command.sourceRef.getSource().runForeach(f -> System.out.println(f.size()), mat);
return Behaviors.same();
}).build();
});
}
使用 BroadcastHub
允许多个消费者使用您的 Source
:
Source<ByteString, NotUsed> fileSource = command.sourceRef.getSource();
RunnableGraph<Source<ByteString, NotUsed>> runnableGraph =
fileSource.toMat(BroadcastHub.of(ByteString.class, 256), Keep.right());
// adjust the buffer size (256) as needed
Source<ByteString, NotUsed> fromFileSource = runnableGraph.run(mat);
fromFileSource.runWith(FileIO.toPath(Paths.get("test.pptx")), mat);
fromFileSource.runForeach(f -> System.out.println(f.size()), mat);
BroadcastHub
根据 Jeffrey 的建议,允许将单个 运行 流连接到随时间启动和停止的多个其他流。
拥有动态连接到其他流的流在内部需要相当多的额外环路,因此如果您不需要它,最好避免这种开销。
如果您的用例是您想要使用带有两个接收器的单个源,那么使用 source.alsoTo(sink1).to(sink2)
会更好。
alsoTo
由 Broadcast 运算符支持,但直接使用它需要您使用 Graph DSL。