使用 Spark-Submit 以 "local" 模式使用 S3A Directory Committer 写入 S3

Using Spark-Submit to write to S3 in "local" mode using S3A Directory Committer

我目前 运行 PySpark 通过本地模式。我希望能够通过 S3 Directory Committer 有效地将镶木地板文件输出到 S3。这个 PySpark 实例正在使用本地磁盘,而不是 HDFS,因为它是通过 spark-submit --master local[*].

提交的

我可以在不启用目录提交程序的情况下成功写入我的 S3 实例。但是,这涉及将暂存文件写入 S3 并重命名它们,速度慢且不可靠。我希望 Spark 作为临时存储写入我的本地文件系统,然后复制到 S3。

我的 PySpark conf 中有以下配置:

self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
self.spark.conf.set("spark.hadoop.fs.s3a.committer.name", "directory")
self.spark.conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
self.spark.conf.set("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")

我的 spark-submit 命令如下所示:

spark-submit --master local[*] --py-files files.zip --packages com.amazonaws:aws-java-sdk:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0,org.apache.spark.internal.io.cloud.PathOutputCommitProtocol --driver-memory 4G --name clean-raw-recording_data main.py 

spark-submit 给我以下错误,因为必需的 JAR 没有到位:

java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

我的问题是:

  1. 为了能够引用 PathOutputCommitProtocol,我需要在 spark-submit --packages 中包含哪个 JAR(特别是 Maven 坐标)?
  2. 一旦我 (1) 开始工作,我是否能够使用 PySpark 的本地模式在本地文件系统上暂存临时文件?还是 HDFS 是一个严格的要求?

我需要这个 运行 在本地模式,而不是集群模式。

编辑:

我使用以下配置使其工作: 使用 pyspark 版本 3.1.2 和包

org.apache.spark:spark-hadoop-cloud_2.12:3.1.1.3.1.7270.0-253.

我需要使用 spark-submit--repositories 选项添加 cloudera 存储库:

spark-submit --repositories https://repository.cloudera.com/artifactory/cloudera-repos/ --packages com.amazonaws:aws-java-sdk:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0,org.apache.spark:spark-hadoop-cloud_2.12:3.1.1.3.1.7270.0-253
  1. 您需要 spark-hadoop-cloud 模块来发布您正在使用的 spark
  2. 提交者很高兴使用本地 fs(现在是 public 集成测试套件工作 https://github.com/hortonworks-spark/cloud-integration。所需要的只是一个在所有工作人员和 spark 驱动程序之间共享的“真实”文件系统,因此驱动程序获取每个待处理提交的清单。
  3. 在作业后打印 _SUCCESS 文件以查看提交者做了什么:0 字节文件 == 旧提交者,JSON 有诊断 == 新提交者