Akka Java 文件 IO 节流
Akka Java File IO Throttling
我想逐行将文件内容流式传输给 Actor。我有这样的东西:
final ActorSystem system = ActorSystem.create("stream_system");
final Materializer materializer = ActorMaterializer.create(system);
final ActorRef actor = system.actorOf(Props.create(streamActor.class), "sink");
final Path file = Paths.get("path/file.txt");
Sink<ByteString, CompletionStage<Done>> printlnSink =
Sink.<ByteString> foreach(chunk -> actor.tell(chunk.utf8String(), null));
//Sink.<ByteString> actorRef(actor, null);
CompletionStage<IOResult> ioResult =
FileIO.fromPath(file)
.throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping())
.to(printlnSink)
.run(materializer);
未注释的版本有效,但它会一次性流式传输整个文件内容。注释版本以 "unknown" 消息结尾。
我想延迟几秒逐行发送给 Actor。任何帮助如何完成这个?接收 actor 只需获取 String 消息并将其打印在输出上。
Framing
class 可以帮助您:
CompletionStage<IOResult> ioResult =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString.fromString(System.lineSeparator()), 1000, FramingTruncation.ALLOW))
.throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping())
.to(printlnSink)
.run(materializer);
我想逐行将文件内容流式传输给 Actor。我有这样的东西:
final ActorSystem system = ActorSystem.create("stream_system");
final Materializer materializer = ActorMaterializer.create(system);
final ActorRef actor = system.actorOf(Props.create(streamActor.class), "sink");
final Path file = Paths.get("path/file.txt");
Sink<ByteString, CompletionStage<Done>> printlnSink =
Sink.<ByteString> foreach(chunk -> actor.tell(chunk.utf8String(), null));
//Sink.<ByteString> actorRef(actor, null);
CompletionStage<IOResult> ioResult =
FileIO.fromPath(file)
.throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping())
.to(printlnSink)
.run(materializer);
未注释的版本有效,但它会一次性流式传输整个文件内容。注释版本以 "unknown" 消息结尾。
我想延迟几秒逐行发送给 Actor。任何帮助如何完成这个?接收 actor 只需获取 String 消息并将其打印在输出上。
Framing
class 可以帮助您:
CompletionStage<IOResult> ioResult =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString.fromString(System.lineSeparator()), 1000, FramingTruncation.ALLOW))
.throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping())
.to(printlnSink)
.run(materializer);