如何用akka remote和steam实现文件容错上传

How to implement fault tolerant file upload with akka remote and steam

我是 Akka 初学者。 (我正在使用 Java)

我正在使用 Akka 制作文件传输系统。

目前,我已经完成发送Actor1(Local) -> Actor2(Remote)文件。

现在,

当我传输文件遇到问题时,我在想如何解决。 然后我有一个问题。题目如下

如果我在传输文件时失去网络连接,则文件传输失败(完成 90%)。 我会在几分钟后恢复我的网络连接。

是否可以传输剩余的文件数据? (剩余 10%)

如果可以,请多多指教

这是我的简单代码。 谢谢:)

演员 1(本地)

private Behavior<Event> onTick() {
    ....
    String fileName = "test.zip";
    Source<ByteString, CompletionStage<IOResult>> logs = FileIO.fromPath(Paths.get(fileName));
    logs.runForeach(f -> originalSize += f.size(), mat).thenRun(() -> System.out.println("originalSize : " + originalSize));
    SourceRef<ByteString> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
    getContext().ask(
            Receiver.FileTransfered.class,
            selectedReceiver,
            timeout,
            responseRef -> new Receiver.TransferFile(logsRef, responseRef, fileName),
            (response, failure) -> {
                if (response != null) {
                    return new TransferCompleted(fileName, response.transferedSize);
                } else {
                    return new JobFailed("Processing timed out", fileName);
                }
            }
    );
}

演员 2(远程)

public static Behavior<Command> create() {
    return Behaviors.setup(context -> {
        ...
        Materializer mat = Materializer.createMaterializer(context);
        return Behaviors.receive(Command.class)
                .onMessage(TransferFile.class, command -> {
                    command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.zip")), mat);
                    command.replyTo.tell(new FileTransfered("filename", 1024));
                    return Behaviors.same();
                }).build();
    });
}

您需要考虑以下内容以正确实施具有容错功能的文件传输:

  1. 如何确定必须为给定的 文件恢复传输.
  2. 如何找到恢复传输的

以下实现对 1 和 2 做了非常简单的假设。

  1. 文件名是唯一的,因此可以用于此类识别。严格来说,这是不正确的,例如,您可以从不同文件夹传输同名文件。或者来自不同的节点等。您将不得不根据您的用例重新调整它。
  2. 假定接收端的last/all写入正确写入了所有字节,写入字节总数指示恢复传输的点。如果不能保证这一点,您需要在逻辑上将原始文件分成块,并将每个块的哈希值、大小和位置传输到接收方,接收方必须在其端验证块并找到正确的指针以恢复传输。
  3. (比 2 多了一点 :) )这个实现忽略了传输问题的识别,而是关注 1 和 2。

代码:

object Sender {
  sealed trait Command
  case class Upload(file: String) extends Command
  case class StartWithIndex(file: String, index: Long) extends Sender.Command

  def behavior(receiver: ActorRef[Receiver.Command]): Behavior[Sender.Command] = Behaviors.setup[Sender.Command] { ctx =>
    implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
    Behaviors.receiveMessage {
      case Upload(file) =>
        receiver.tell(Receiver.InitUpload(file, ctx.self.narrow[StartWithIndex]))
        ctx.log.info(s"Initiating upload of $file")
        Behaviors.same
      case StartWithIndex(file, starWith) =>
        val source = FileIO.fromPath(Paths.get(file), chunkSize = 8192, starWith)
        val ref = source.runWith(StreamRefs.sourceRef())
        ctx.log.info(s"Starting upload of $file")
        receiver.tell(Receiver.Upload(file, ref))
        Behaviors.same
    }
  }
}
object Receiver {
  sealed trait Command

  case class InitUpload(file: String, replyTo: ActorRef[Sender.StartWithIndex]) extends Command

  case class Upload(file: String, fileSource: SourceRef[ByteString]) extends Command

  val behavior: Behavior[Receiver.Command] = Behaviors.setup[Receiver.Command] { ctx =>
    implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
    Behaviors.receiveMessage {
      case InitUpload(path, replyTo) =>
        val file = fileAtDestination(path)
        val index = if (file.exists()) file.length else 0
        ctx.log.info(s"Got init command for $file at pointer $index")
        replyTo.tell(Sender.StartWithIndex(path, index.toLong))
        Behaviors.same
      case Upload(path, fileSource) =>
        val file = fileAtDestination(path)
        val sink = if (file.exists()) {
          FileIO.toPath(file.toPath, Set(StandardOpenOption.APPEND, StandardOpenOption.WRITE))
        } else {
          FileIO.toPath(file.toPath, Set(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))
        }
        ctx.log.info(s"Saving file into ${file.toPath}")
        fileSource.runWith(sink)
        Behaviors.same
    }
  }
}

一些辅助方法

val destination: File = Files.createTempDirectory("destination").toFile

def fileAtDestination(file: String) = {
  val name = new File(file).getName
  new File(destination, name)
}

def writeRandomToFile(file: File, size: Int): Unit = {
  val out = new FileOutputStream(file, true)
  (0 until size).foreach { _ =>
    out.write(Random.nextPrintableChar())
  }
  out.close()
}

最后是一些测试代码

// sender and receiver bootstrapping is omitted

//Create some dummy file to upload
val file: Path = Files.createTempFile("test", "test")
writeRandomToFile(file.toFile, 1000)

//Initiate a new upload
sender.tell(Sender.Upload(file.toAbsolutePath.toString))
// Sleep to allow file upload to finish
Thread.sleep(1000)

//Write more data to the file to emulate a failure
writeRandomToFile(file.toFile, 1000)
//Initiate a new upload that will "recover" from the previous upload 
sender.tell(Sender.Upload(file.toAbsolutePath.toString))

最后整个过程可以定义为