通过反应流发送大文件

Send big file over reactive stream

我正在编写的部分应用程序需要从客户端向服务器传输任意大的文件(对于这个问题我假设 100-200 GB)。重要的是,接收器(服务器)没有存储这个文件——它只是 read/examine 流并将它发送到下一个点。因为在任何时候我都不需要整个文件,但希望同时进行多次传输,所以我想尽量减少 RAM 使用并消除磁盘​​使用。我想以 1 MB 的块处理文件。

现在,服务器使用 Spring Boot 和 Akka。

我的第一次尝试是在客户端打开缓冲文件输入流,以 1 MB 的块读取它并在单独的线程中将它们发送到消息中。它有效,但问题是客户端正在一个接一个地发送消息而不用担心服务器是否有缓冲区来存储它(缺乏背压)。

我的第二个想法是像这样使用 akka-streams:

How to use Reactive Streams for NIO binary processing?

像这样使用 ActorPublisher:

但是,如此处所述:

http://doc.akka.io/docs/akka/2.4.16/scala/stream/stream-integrations.html#Implementing_Reactive_Streams_Publisher_or_Subscriber

“警告 ActorPublisher 和 ActorSubscriber 可能会在 Akka 的未来版本中被弃用。

警告 ActorPublisher 和 ActorSubscriber 不能与远程 actor 一起使用,因为如果 Reactive Streams 协议(例如请求)的信号丢失,流可能会死锁。"

这看起来不是个好主意。

我不想将其保存在任何存储提供商(保管箱、google 驱动器等)中,因为我想即时分析数据。我有 Spring 5 和 Akka,但我可以使用任何其他软件来解决这个问题。原始套接字将缺乏背压,并且种子不能保证 sequential/ordered 读写(我需要)。

主要问题是:如何将大文件从客户端流式传输到服务器,假设服务器不能立即将文件存储在磁盘或 ram 中?

额外的问题是:如何计算"correct"这种传输中块的大小?

几天来我一直在寻找答案,看起来我不是唯一遇到此类问题的人,但是没有指出其他适当的替代解决方案的答案或类似 "don't do it" 的答案。

Akka 流专门为此用例提供功能:streaming File IO。来自文档:

import akka.stream.scaladsl._
val file = Paths.get("example.csv")

val foreach: Future[IOResult] = 
  FileIO.fromPath(file)
        .to(Sink.ignore)
        .run()

关于 "correct size" 大块的奖励问题;这在很大程度上取决于您的硬件和软件配置。最好的办法是编写一个测试客户端并调整块大小,直到为您的服务器找到 "sweet spot"。