将大型镶木地板文件(5 亿行/1000 列)写入 S3 需要太多时间
Writing large parquet file (500 millions row / 1000 columns) to S3 takes too much time
首先让我介绍一下我的用例,我每天收到 5 亿行,如下所示:
ID |类别
1 | cat1, cat2, cat3, ..., catn
2 | cat1, catx, caty, ..., 另一个类别
输入数据:50 个压缩的 csv 文件,每个文件为 250 MB -> 总计:12.5 GB 压缩
目的是回答以下问题:查找属于 Catx 和 Caty 的所有 ID,查找属于 cat3 而不是 caty 的 ID 等...:即:ids in cat1 U cat2或ids cat3∩catx
假设类别是动态创建的(我每天都有一组新的类别)并且我的业务想要探索所有可能的交集和联合(我们没有一组固定的查询)我想出了以下解决方案:
我写了一个 spark 作业,将日期转换为一个胖稀疏矩阵,其中列是所有可能的类别加上列 ID,对于我设置为 true 的每一行和列,如果 id 属于此类别,则为 false:
ID |类别 1 |类别 2 |类别 3 |...|猫 |猫猫 |目录 |另一个类别 |....
1 |真 |真 |真 |...|真 |假 |假 |假 |....
2 |真|假|假|...|假|真 |真 |真 |....
SQL 可以简单地回答我的问题,例如,如果我想找到属于类别 cat1 和类别 catx 的所有 ID,那么我 运行 以下 sql 查询矩阵:
Select 来自 MyTable 的 ID,其中 cat1 = true 且 catx=true;
我选择将这个稀疏矩阵保存为压缩的parquet文件,我做出这个选择是考虑到稀疏性和查询性质,我认为列存储是最合适的存储格式。
这里描述的用例是我的观察结果,我可能遗漏了一些优化点:
- 转换后 12.5GB 压缩输入数据需要约 300GB 写入此稀疏矩阵,因为 parquet 需要太多时间和资源,spark1.6 独立集群 6 个 aws 实例 r4.4xlarge (i设置足够的并行化来分配工作并利用我拥有的所有工人)
我最终得到了太多的 parquet 文件,我并行化的越多,最小的 parquet 文件就越多。似乎每个 RDD 都提供一个镶木地板文件 -> 太多的小文件不是扫描的最佳选择,因为我的查询遍历了所有列值
我浏览了很多帖子,但仍然不明白为什么将 500 Million/1000 列压缩镶木地板写入 S3 需要这么多时间,一旦在 S3 上,小文件总计为~35G
找申请高手UI,作业挂在写阶段,转换阶段和洗牌好像resource/time不消耗
我尝试调整 parquet 参数,例如 group_size、page_size 和 disable_dictionnary,但没有看到性能改进。
我试图分割成更大的RDD并将它们写入S3以获得更大的parquet文件但是这个工作花费了太多时间,最后我杀了它
我可以 运行 使用由 4 个 r4.16xlarge 类型的 aws 实例组成的 spark 2.1 独立集群,在大约 1 小时内完成这项工作,我觉得我正在使用一个巨大的集群来实现了一个小改进,我得到的唯一好处是 运行 可以执行更多并行任务。我错过了什么吗?我也许可以利用 ~ 1 To RAM 来更好地实现这一点并获得更大的镶木地板文件。
你们对使用 spark 在 S3 上编写大型 parquet 文件有什么反馈吗?
我也想知道您对此解决方案的看法/批评。
感谢和问候。
它结合了 Spark 重新读取内容来做摘要(你可以禁用它)和提交工作的算法来做 rename(),它在 S3 中被一个副本模仿。
请参阅“Apache Spark and object stores了解更多详细信息和一些可以稍微加快您工作速度的开关(禁用摘要,使用较少重命名算法)
即使使用这些,您也会遇到延迟,并且由于 S3 最终是一致的,因此存在产生损坏输出的风险。写入临时 HDFS 文件系统然后在所有工作结束时复制到 S3 是最安全的
首先让我介绍一下我的用例,我每天收到 5 亿行,如下所示:
ID |类别
1 | cat1, cat2, cat3, ..., catn
2 | cat1, catx, caty, ..., 另一个类别
输入数据:50 个压缩的 csv 文件,每个文件为 250 MB -> 总计:12.5 GB 压缩
目的是回答以下问题:查找属于 Catx 和 Caty 的所有 ID,查找属于 cat3 而不是 caty 的 ID 等...:即:ids in cat1 U cat2或ids cat3∩catx
假设类别是动态创建的(我每天都有一组新的类别)并且我的业务想要探索所有可能的交集和联合(我们没有一组固定的查询)我想出了以下解决方案:
我写了一个 spark 作业,将日期转换为一个胖稀疏矩阵,其中列是所有可能的类别加上列 ID,对于我设置为 true 的每一行和列,如果 id 属于此类别,则为 false:
ID |类别 1 |类别 2 |类别 3 |...|猫 |猫猫 |目录 |另一个类别 |....
1 |真 |真 |真 |...|真 |假 |假 |假 |....
2 |真|假|假|...|假|真 |真 |真 |....
SQL 可以简单地回答我的问题,例如,如果我想找到属于类别 cat1 和类别 catx 的所有 ID,那么我 运行 以下 sql 查询矩阵:
Select 来自 MyTable 的 ID,其中 cat1 = true 且 catx=true;
我选择将这个稀疏矩阵保存为压缩的parquet文件,我做出这个选择是考虑到稀疏性和查询性质,我认为列存储是最合适的存储格式。
这里描述的用例是我的观察结果,我可能遗漏了一些优化点:
- 转换后 12.5GB 压缩输入数据需要约 300GB 写入此稀疏矩阵,因为 parquet 需要太多时间和资源,spark1.6 独立集群 6 个 aws 实例 r4.4xlarge (i设置足够的并行化来分配工作并利用我拥有的所有工人)
我最终得到了太多的 parquet 文件,我并行化的越多,最小的 parquet 文件就越多。似乎每个 RDD 都提供一个镶木地板文件 -> 太多的小文件不是扫描的最佳选择,因为我的查询遍历了所有列值
我浏览了很多帖子,但仍然不明白为什么将 500 Million/1000 列压缩镶木地板写入 S3 需要这么多时间,一旦在 S3 上,小文件总计为~35G
找申请高手UI,作业挂在写阶段,转换阶段和洗牌好像resource/time不消耗
我尝试调整 parquet 参数,例如 group_size、page_size 和 disable_dictionnary,但没有看到性能改进。
我试图分割成更大的RDD并将它们写入S3以获得更大的parquet文件但是这个工作花费了太多时间,最后我杀了它
我可以 运行 使用由 4 个 r4.16xlarge 类型的 aws 实例组成的 spark 2.1 独立集群,在大约 1 小时内完成这项工作,我觉得我正在使用一个巨大的集群来实现了一个小改进,我得到的唯一好处是 运行 可以执行更多并行任务。我错过了什么吗?我也许可以利用 ~ 1 To RAM 来更好地实现这一点并获得更大的镶木地板文件。
你们对使用 spark 在 S3 上编写大型 parquet 文件有什么反馈吗?
我也想知道您对此解决方案的看法/批评。
感谢和问候。
它结合了 Spark 重新读取内容来做摘要(你可以禁用它)和提交工作的算法来做 rename(),它在 S3 中被一个副本模仿。
请参阅“Apache Spark and object stores了解更多详细信息和一些可以稍微加快您工作速度的开关(禁用摘要,使用较少重命名算法)
即使使用这些,您也会遇到延迟,并且由于 S3 最终是一致的,因此存在产生损坏输出的风险。写入临时 HDFS 文件系统然后在所有工作结束时复制到 S3 是最安全的