如何从 Akka 中提取实时 FileIO 状态?

How do I extract the real-time FileIO state from Akka?

我正在使用 Akka 制作一个文件传输系统。我一直在看文件一段时间。目前的进度状态是Actor2收到Actor1发送的文件,并写入到Actor2的本地系统(Actor1 = sender, Actor2 = receiver)。

但是我找不到办法知道我在写入时实时收到了多少字节。

我测试了一下,原来用runWith API,可以在本地写文件。使用runForeach API,实时传递了多少字节。但是,如果这两个同时创建,则无法写入文件。

这是我的简单来源。请给我一些建议。

public static Behavior<Command> create() {
    return Behaviors.setup(context -> {
        context.getLog().info("Registering myself with receptionist");
        context.getSystem().receptionist().tell(Receptionist.register(RECEIVER_SERVICE_KEY, context.getSelf().narrow()));
        Materializer mat = Materializer.createMaterializer(context);

        return Behaviors.receive(Command.class)
                .onMessage(TransferFile.class, command -> {
                    command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.pptx")), mat);
                    //command.replyTo.tell(new FileTransfered("filename", 1024));
                    command.sourceRef.getSource().runForeach(f -> System.out.println(f.size()), mat);
                    return Behaviors.same();
                }).build();
    });
}

使用 BroadcastHub 允许多个消费者使用您的 Source:

Source<ByteString, NotUsed> fileSource = command.sourceRef.getSource();

RunnableGraph<Source<ByteString, NotUsed>> runnableGraph =
  fileSource.toMat(BroadcastHub.of(ByteString.class, 256), Keep.right());
// adjust the buffer size (256) as needed

Source<ByteString, NotUsed> fromFileSource = runnableGraph.run(mat);

fromFileSource.runWith(FileIO.toPath(Paths.get("test.pptx")), mat);
fromFileSource.runForeach(f -> System.out.println(f.size()), mat);

BroadcastHub 根据 Jeffrey 的建议,允许将单个 运行 流连接到随时间启动和停止的多个其他流。

拥有动态连接到其他流的流在内部需要相当多的额外环路,因此如果您不需要它,最好避免这种开销。

如果您的用例是您想要使用带有两个接收器的单个源,那么使用 source.alsoTo(sink1).to(sink2) 会更好。

流程 API 中的

alsoToBroadcast 运算符支持,但直接使用它需要您使用 Graph DSL。