如何用akka remote和steam实现文件容错上传
How to implement fault tolerant file upload with akka remote and steam
我是 Akka 初学者。 (我正在使用 Java)
我正在使用 Akka 制作文件传输系统。
目前,我已经完成发送Actor1(Local) -> Actor2(Remote)文件。
现在,
当我传输文件遇到问题时,我在想如何解决。
然后我有一个问题。题目如下
如果我在传输文件时失去网络连接,则文件传输失败(完成 90%)。
我会在几分钟后恢复我的网络连接。
是否可以传输剩余的文件数据? (剩余 10%)
如果可以,请多多指教
这是我的简单代码。
谢谢:)
演员 1(本地)
private Behavior<Event> onTick() {
....
String fileName = "test.zip";
Source<ByteString, CompletionStage<IOResult>> logs = FileIO.fromPath(Paths.get(fileName));
logs.runForeach(f -> originalSize += f.size(), mat).thenRun(() -> System.out.println("originalSize : " + originalSize));
SourceRef<ByteString> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
getContext().ask(
Receiver.FileTransfered.class,
selectedReceiver,
timeout,
responseRef -> new Receiver.TransferFile(logsRef, responseRef, fileName),
(response, failure) -> {
if (response != null) {
return new TransferCompleted(fileName, response.transferedSize);
} else {
return new JobFailed("Processing timed out", fileName);
}
}
);
}
演员 2(远程)
public static Behavior<Command> create() {
return Behaviors.setup(context -> {
...
Materializer mat = Materializer.createMaterializer(context);
return Behaviors.receive(Command.class)
.onMessage(TransferFile.class, command -> {
command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.zip")), mat);
command.replyTo.tell(new FileTransfered("filename", 1024));
return Behaviors.same();
}).build();
});
}
您需要考虑以下内容以正确实施具有容错功能的文件传输:
- 如何确定必须为给定的 文件恢复传输.
- 如何找到恢复传输的点。
以下实现对 1 和 2 做了非常简单的假设。
- 文件名是唯一的,因此可以用于此类识别。严格来说,这是不正确的,例如,您可以从不同文件夹传输同名文件。或者来自不同的节点等。您将不得不根据您的用例重新调整它。
- 假定接收端的last/all写入正确写入了所有字节,写入字节总数指示恢复传输的点。如果不能保证这一点,您需要在逻辑上将原始文件分成块,并将每个块的哈希值、大小和位置传输到接收方,接收方必须在其端验证块并找到正确的指针以恢复传输。
- (比 2 多了一点 :) )这个实现忽略了传输问题的识别,而是关注 1 和 2。
代码:
object Sender {
sealed trait Command
case class Upload(file: String) extends Command
case class StartWithIndex(file: String, index: Long) extends Sender.Command
def behavior(receiver: ActorRef[Receiver.Command]): Behavior[Sender.Command] = Behaviors.setup[Sender.Command] { ctx =>
implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
Behaviors.receiveMessage {
case Upload(file) =>
receiver.tell(Receiver.InitUpload(file, ctx.self.narrow[StartWithIndex]))
ctx.log.info(s"Initiating upload of $file")
Behaviors.same
case StartWithIndex(file, starWith) =>
val source = FileIO.fromPath(Paths.get(file), chunkSize = 8192, starWith)
val ref = source.runWith(StreamRefs.sourceRef())
ctx.log.info(s"Starting upload of $file")
receiver.tell(Receiver.Upload(file, ref))
Behaviors.same
}
}
}
object Receiver {
sealed trait Command
case class InitUpload(file: String, replyTo: ActorRef[Sender.StartWithIndex]) extends Command
case class Upload(file: String, fileSource: SourceRef[ByteString]) extends Command
val behavior: Behavior[Receiver.Command] = Behaviors.setup[Receiver.Command] { ctx =>
implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
Behaviors.receiveMessage {
case InitUpload(path, replyTo) =>
val file = fileAtDestination(path)
val index = if (file.exists()) file.length else 0
ctx.log.info(s"Got init command for $file at pointer $index")
replyTo.tell(Sender.StartWithIndex(path, index.toLong))
Behaviors.same
case Upload(path, fileSource) =>
val file = fileAtDestination(path)
val sink = if (file.exists()) {
FileIO.toPath(file.toPath, Set(StandardOpenOption.APPEND, StandardOpenOption.WRITE))
} else {
FileIO.toPath(file.toPath, Set(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))
}
ctx.log.info(s"Saving file into ${file.toPath}")
fileSource.runWith(sink)
Behaviors.same
}
}
}
一些辅助方法
val destination: File = Files.createTempDirectory("destination").toFile
def fileAtDestination(file: String) = {
val name = new File(file).getName
new File(destination, name)
}
def writeRandomToFile(file: File, size: Int): Unit = {
val out = new FileOutputStream(file, true)
(0 until size).foreach { _ =>
out.write(Random.nextPrintableChar())
}
out.close()
}
最后是一些测试代码
// sender and receiver bootstrapping is omitted
//Create some dummy file to upload
val file: Path = Files.createTempFile("test", "test")
writeRandomToFile(file.toFile, 1000)
//Initiate a new upload
sender.tell(Sender.Upload(file.toAbsolutePath.toString))
// Sleep to allow file upload to finish
Thread.sleep(1000)
//Write more data to the file to emulate a failure
writeRandomToFile(file.toFile, 1000)
//Initiate a new upload that will "recover" from the previous upload
sender.tell(Sender.Upload(file.toAbsolutePath.toString))
最后整个过程可以定义为
我是 Akka 初学者。 (我正在使用 Java)
我正在使用 Akka 制作文件传输系统。
目前,我已经完成发送Actor1(Local) -> Actor2(Remote)文件。
现在,
当我传输文件遇到问题时,我在想如何解决。 然后我有一个问题。题目如下
如果我在传输文件时失去网络连接,则文件传输失败(完成 90%)。 我会在几分钟后恢复我的网络连接。
是否可以传输剩余的文件数据? (剩余 10%)
如果可以,请多多指教
这是我的简单代码。 谢谢:)
演员 1(本地)
private Behavior<Event> onTick() {
....
String fileName = "test.zip";
Source<ByteString, CompletionStage<IOResult>> logs = FileIO.fromPath(Paths.get(fileName));
logs.runForeach(f -> originalSize += f.size(), mat).thenRun(() -> System.out.println("originalSize : " + originalSize));
SourceRef<ByteString> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
getContext().ask(
Receiver.FileTransfered.class,
selectedReceiver,
timeout,
responseRef -> new Receiver.TransferFile(logsRef, responseRef, fileName),
(response, failure) -> {
if (response != null) {
return new TransferCompleted(fileName, response.transferedSize);
} else {
return new JobFailed("Processing timed out", fileName);
}
}
);
}
演员 2(远程)
public static Behavior<Command> create() {
return Behaviors.setup(context -> {
...
Materializer mat = Materializer.createMaterializer(context);
return Behaviors.receive(Command.class)
.onMessage(TransferFile.class, command -> {
command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.zip")), mat);
command.replyTo.tell(new FileTransfered("filename", 1024));
return Behaviors.same();
}).build();
});
}
您需要考虑以下内容以正确实施具有容错功能的文件传输:
- 如何确定必须为给定的 文件恢复传输.
- 如何找到恢复传输的点。
以下实现对 1 和 2 做了非常简单的假设。
- 文件名是唯一的,因此可以用于此类识别。严格来说,这是不正确的,例如,您可以从不同文件夹传输同名文件。或者来自不同的节点等。您将不得不根据您的用例重新调整它。
- 假定接收端的last/all写入正确写入了所有字节,写入字节总数指示恢复传输的点。如果不能保证这一点,您需要在逻辑上将原始文件分成块,并将每个块的哈希值、大小和位置传输到接收方,接收方必须在其端验证块并找到正确的指针以恢复传输。
- (比 2 多了一点 :) )这个实现忽略了传输问题的识别,而是关注 1 和 2。
代码:
object Sender {
sealed trait Command
case class Upload(file: String) extends Command
case class StartWithIndex(file: String, index: Long) extends Sender.Command
def behavior(receiver: ActorRef[Receiver.Command]): Behavior[Sender.Command] = Behaviors.setup[Sender.Command] { ctx =>
implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
Behaviors.receiveMessage {
case Upload(file) =>
receiver.tell(Receiver.InitUpload(file, ctx.self.narrow[StartWithIndex]))
ctx.log.info(s"Initiating upload of $file")
Behaviors.same
case StartWithIndex(file, starWith) =>
val source = FileIO.fromPath(Paths.get(file), chunkSize = 8192, starWith)
val ref = source.runWith(StreamRefs.sourceRef())
ctx.log.info(s"Starting upload of $file")
receiver.tell(Receiver.Upload(file, ref))
Behaviors.same
}
}
}
object Receiver {
sealed trait Command
case class InitUpload(file: String, replyTo: ActorRef[Sender.StartWithIndex]) extends Command
case class Upload(file: String, fileSource: SourceRef[ByteString]) extends Command
val behavior: Behavior[Receiver.Command] = Behaviors.setup[Receiver.Command] { ctx =>
implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
Behaviors.receiveMessage {
case InitUpload(path, replyTo) =>
val file = fileAtDestination(path)
val index = if (file.exists()) file.length else 0
ctx.log.info(s"Got init command for $file at pointer $index")
replyTo.tell(Sender.StartWithIndex(path, index.toLong))
Behaviors.same
case Upload(path, fileSource) =>
val file = fileAtDestination(path)
val sink = if (file.exists()) {
FileIO.toPath(file.toPath, Set(StandardOpenOption.APPEND, StandardOpenOption.WRITE))
} else {
FileIO.toPath(file.toPath, Set(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))
}
ctx.log.info(s"Saving file into ${file.toPath}")
fileSource.runWith(sink)
Behaviors.same
}
}
}
一些辅助方法
val destination: File = Files.createTempDirectory("destination").toFile
def fileAtDestination(file: String) = {
val name = new File(file).getName
new File(destination, name)
}
def writeRandomToFile(file: File, size: Int): Unit = {
val out = new FileOutputStream(file, true)
(0 until size).foreach { _ =>
out.write(Random.nextPrintableChar())
}
out.close()
}
最后是一些测试代码
// sender and receiver bootstrapping is omitted
//Create some dummy file to upload
val file: Path = Files.createTempFile("test", "test")
writeRandomToFile(file.toFile, 1000)
//Initiate a new upload
sender.tell(Sender.Upload(file.toAbsolutePath.toString))
// Sleep to allow file upload to finish
Thread.sleep(1000)
//Write more data to the file to emulate a failure
writeRandomToFile(file.toFile, 1000)
//Initiate a new upload that will "recover" from the previous upload
sender.tell(Sender.Upload(file.toAbsolutePath.toString))
最后整个过程可以定义为