如果本地磁盘上 space 空间不足,如何将大型数据集上传到云文件系统(S3、HDFS)?

How is a large dataset uploaded to a cloud file system (S3, HDFS) if there isn't enough space on local disk?

我有一个项目处理在 EMR 上使用 Spark 处理数据。

据我所知,人们通常将他们的输入数据存储在某些文件系统(HDFS、S3 或本地)上,然后对其进行操作。如果数据非常大,我们不想存储在本地。

我的问题是,如果我生成了一堆数据,您如何将这些数据远程存储在 S3 或首先存在的任何云文件系统上?难道我不需要先把数据存到本地再存到云端吗?

我问这个是因为目前,我使用的服务有一个 returns Spark Dataset 对象的方法。我不太确定在调用该方法和通过 EMR 上的 Spark 处理它之间的工作流程如何。

在处理 Spark 和任何分布式存储时请记住,Spark 集群中有一定数量的节点。

虽然 Dataset 转换是从名为 driver 的集群的单个节点进行操作的,但通常的做法是永远不会在此类集群中的单个节点上收集所有处理过的数据。集群中 executor 角色的每个节点在将其摄取到 Spark、处理和存储回某种存储的过程中使用整个数据的一小部分。

通过这种方法,单个节点的限制不会限制集群可以处理的数据量。

对象存储连接器倾向于以块的形式写入数据;对于每个分区,工作通过 Hadoop FS API 创建一个文件,路径类似于 s3://bucket/dest/__temporary/0/task_0001/part-0001.csv,返回工作人员写入的输出流,仅此而已。

我不知道封闭源代码的 EMR s3 连接器,上面有 ASF S3A 供您检查

  1. 数据缓冲到fs.s3a.blocksize的值;默认 = `32M,即 32MB
  2. 缓冲到磁盘(默认)、堆(数组)或堆外字节缓冲区S3ADataBlocks
  3. 写入数据时,一旦达到缓冲区阈值,就会上传该块(单独的线程);创建了一个新的块缓冲区。 S3ABlockOutputStream.write
  4. 当调用流的 close() 方法时,任何未完成的数据都会被 PUT 到 S3,然后线程阻塞,直到全部上传完毕。 S3ABlockOutputStream.close

上传是在一个单独的线程中进行的,因此即使网络速度很慢,您也可以稍微更快地生成数据,块在最后。您需要的 disk/ram 数量与所有上传数据的工作人员的所有未完成块一样多。用于上传的线程池是共享的并且大小有限,因此您可以调整参数以限制这些值。虽然这通常只有在您尝试在内存中缓冲时才需要。

当队列填满时,工作线程写入 S3 输出流块,通过 SemaphoredDelegatingExecutor

您需要的本地存储量取决于:

  • spark 工作线程数
  • 他们生成的数据率
  • 您必须上传数据的 threads/http 个连接数
  • 从VM到S3的带宽(极限)
  • 任何节流 S3 都会对许多客户端写入存储桶的同一位进行操作

那是 S3A 连接器; EMR s3 会有所不同,但同样,上传带宽将成为瓶颈。我认为它也有一些东西可以阻止工作人员创建比网络可以处理的更多数据。

无论如何:对于 Spark 和它在下面使用的 hadoop 代码,所有源代码都在那里供您探索。不要害怕这样做!