使用列的子集时,在 EMR/Spark 的 S3 上使用 Parquet 会节省带宽吗?

Does using Parquet on S3 with EMR/Spark save bandwidth when using subset of columns?

我有一个 EMR 集群 运行 Spark。在第一步中,CSV 文件被转换为按 date 列分区的 paruqet.snappy 格式,所以我剩下

s3://my-bucket/dataset/date=2020-12-20/part-0001.parquet.snappy
s3://my-bucket/dataset/date=2020-12-20/part-0002.parquet.snappy
s3://my-bucket/dataset/date=2020-12-20/part-0003.parquet.snappy
s3://my-bucket/dataset/date=2020-12-20/part-0004.parquet.snappy

列是

id,name,value

后续作业处理此数据:

df = spark.read.parquet('s3://my-bucket/dataset')
df.registerAsTempView('dataset')
spark.sql('''
select id, 
       sum(value)
from dataset
where date=2020-12-20
group by 1;
''')

所以在查询中我没有使用 name 列。根据我对 Parquet 的了解,根本不会从磁盘读取对应于列 name 的数据块。

问题:

A) 数据集的所有part-000x部分是否真的从S3下载到Spark集群,只是只加载了需要的列到内存中(没有节省带宽,但还是有好处的将列读入内存时的列格式)

B) Spark 可以以某种方式 seek() 进入 S3 上的文件,以便它只能下载与所需列相对应的部分的某些小节? (节省带宽)

尝试优化列式数据的对象存储读取是一个非常有趣的问题。

我不能代表 EMR S3 连接器,因为我还没有看到它的代码。

但是一般通过spark读取s3上的parquet文件的计划是

  • 文件已打开(HEAD。可能是 GET)
  • 寻找 EOF 后的几个字节,读取魔术文件类型(安全检查)和完整页脚的位置
  • 查找并阅读页脚以说明列在文件、模式等中的位置

然后它完成工作:seek/bulk读取带有它想要的列的条纹query/include。

具体使用哪些 API 取决于库; parquet.jar 以 2MB 的块大小执行 readFully(offset)。 2MB 块的背靠背读取非常常见。

如果有任何谓词下推,则读取条带后的条带摘要数据,然后跳过条带,或者需要它,在这种情况下,会向后 seek/full 读取条带。

在使用任何对象存储时,虽然带宽有限,但处理“最佳”搜索实际上是让人们忙碌的事情。从 0-EOF 执行完整的 GET,一旦 parquet 代码执行 seek(),您必须决定是读取并丢弃请求中剩余的字节,还是中止 HTTPS 连接,从新的开始获取一个新的位置并承担新 TLS 协商的成本。

随机 IO 搜索策略说“执行更短的 GET,以便我们可以为下一个块回收连接”;对于 Parquet 和 ORC 效果最好,对于 .csv、.avro 等来说很糟糕。然后是“你想做范围+“额外”的 GET 吗,因为那些常见的背靠背读取。甚至:你应该预取吗下一个街区?

最新的 ASF 连接器(S3 的 S3A 和 Azure 存储的 ABFS)的普遍共识是:在第一次向后搜索时开始顺序并切换到随机 IO;添加一种更改默认值的方法。ABFS 也做了一些下一个块的异步预取,这可以提高顺序读取,如果不需要,在 CPU、网络和 Azure 计费成本方面也不会太昂贵。

哦,有时代码会连续多次执行 seek(l1)、seek(l2),因此您只需记住该偏移量,只需担心在第一个 read() 时发出 GET 请求。

就像我说的:这很复杂。有很多调整的机会。

如果您使用的是 s3a:// 连接器,则在其上调用 .toString() 并获得有关已读取、跳过、丢弃、连接中止等数据量的统计信息等等

进一步阅读:s3a connector input stream seek code