EMR 上的 Spark 究竟是如何从 S3 读取的?

How exactly does Spark on EMR read from S3?

关于使用 Spark 将 s3 上的文件读入 EMR 集群背后的实际机制的几个简单问题:

Does spark.read.format("com.databricks.spark.csv").load("s3://my/dataset/").where($"state" === "WA") communicate the whole dataset into the EMR cluster's local HDFS and then perform the filter after? Or does it filter records when bringing the dataset into the cluster? Or does it do neither? If this is the case, what's actually happening?

官方文档没有解释发生了什么(或者即使有解释,我也找不到)。有人可以用这样的解释来解释或 link 资源吗?

当您指定位于 S3 上的文件时,它们将被读入集群。处理发生在集群节点上。

不过,S3 Select 可能会改变这一点,目前处于预览阶段。

我不能代表闭源 AWS,但 ASF s3a: 连接器在 S3AInputStream

中发挥作用

通过 HTTPS 读取数据,启动时间较慢,如果您需要在 GET 完成之前停止下载,将强制您中止 TCP 流并创建一个新流。

为了降低成本,代码具有

等功能
  • 惰性搜索:当您执行 seek() 时,它会更新其内部指针,但不会发出新的 GET,直到您实际进行读取。

  • 根据剩余数量选择是中止 () 还是读取以结束 GET

  • 有3种IO模式:

"sequential",GET内容范围是从(pos, EOF)。最好的带宽,最差的寻道性能。适用于:CSV、.gz、...

"random":小 GET,最小(块大小,长度(读取))。最适合以可搜索格式 (snappy) 压缩的柱状数据(ORC、Parquet)

"adaptive"(上周新增,基于微软在 Azure WASB 连接器上的一些工作)。从顺序开始,一旦你向后搜索切换到随机 IO

代码就在这里,欢迎改进。当前性能工作(尤其是随机 IO)基于 Hive 上 ORC 数据的 TPC-DS 基准测试,顺便说一句)

假设您正在读取 CSV 并在那里进行过滤,它将读取整个 CSV 文件并进行过滤。这对于大文件来说非常低效。最好导入到列格式中,并对下面的层使用谓词下推来查找文件以进行过滤和读取列

  • 从 S3 (s3://-) 加载数据通常通过 EMR 中的 EMRFS
  • EMRFS 直接访问 S3(不通过 HDFS)
  • Spark从S3加载数据时,根据StorageLevel(内存或磁盘)将数据存储为集群中的DataSet
  • 最后,Spark 过滤器加载数据