无法识别 Spark 魔术输出提交程序设置

Spark magic output committer settings not recognized

我正在尝试为 s3 使用不同的 Spark 输出提交器设置,并想试用神奇的提交器。到目前为止,我还没有设法让我的工作使用魔法提交器,而且它们似乎总是依赖于文件输出提交器。

我是 运行 的 Spark 作业是一个简单的 PySpark 测试作业,它运行一个简单的查询、重新分区数据并将 parquet 输出到 s3:

df = spark.sql("select * from some_table where some_condition")

df.write \
  .partitionBy("some_column") \
  .parquet("s3://some-bucket/some-folder", mode="overwrite")

相关的 spark 设置是(取自 Spark UI,作业的环境选项卡):

spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a   org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.hadoop.fs.s3a.committer.magic.enabled true
spark.hadoop.fs.s3a.committer.name  magic
spark.hadoop.fs.s3a.committer.staging.tmp.path  tmp/staging
spark.hadoop.fs.s3a.committer.staging.unique-filenames  true
spark.sql.parquet.output.committer.class    org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass   org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
mapreduce.output.fileoutputformat.compress  false
mapreduce.output.fileoutputformat.compress.codec    org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type RECORD
mapreduce.outputcommitter.factory.scheme.s3a    org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
mapreduce.fileoutputcommitter.algorithm.version 1
mapreduce.fileoutputcommitter.task.cleanup.enabled  false
mapreduce.outputcommitter.factory.scheme.s3a    org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory

Hadoop 属性:

fs.s3a.committer.magic.enabled  true
fs.s3a.committer.name   magic

(让我知道是否有任何其他相关设置)

我基于以下几点观察使用文件提交器而不是魔术提交器:

  1. spark 作业生成的不同日志行似乎表明正在使用的文件输出提交者:
"class":"org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter","file_line":"FileOutputCommitter.java:601","func":"commitTask","message":"Saved output of task 'attempt_2021...' to s3://some-bucket/some-folder/_temporary/0/
task_2021..."
"class":"org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat","file_line":"ParquetFileFormat.scala:54","message":"U
sing user defined output committer for Parquet: org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"      
"class":"org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter","file_line":"FileOutputCommitter.java:141","func":"<init>","message":"File Outpu
t Committer Algorithm version is 1"                                                                                  
"class":"org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter","file_line":"FileOutputCommitter.java:156","func":"<init>","message":"FileOutput
Committer skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false"    
  1. 将文件提交者的算法设置为无效数字时,如下所示:
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version -7

文件提交者的构造函数引发异常,指出该值无效 - 暗示初始化的是文件提交者而不是魔术提交者。

我没有看到任何日志表明使用了魔法提交器,也没有看到任何初始化提交器的失败,这可以解释回退到文件提交器。

Spark 版本是 3.1.2,使用 this spark-hadoop-cloud JAR。让我知道是否有任何其他我可以尝试的正式发布的 JAR,或者是否有任何其他可能相关的日志指示。

有什么想法吗?

===== 编辑:

下面是我在将文件提交程序算法设置为无效值时看到的堆栈跟踪。似乎对 org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.setupCommitter 的调用最终调用了 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory.createOutputCommitter,后者又初始化了不正确的类型 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 而不是配置的类型 org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

    Py4JJavaError: An error occurred while calling o259.parquet.
: java.io.IOException: Only 1 or 2 algorithm version is supported
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:143)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:117)
    at org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.createFileOutputCommitter(PathOutputCommitterFactory.java:134)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory.createOutputCommitter(FileOutputCommitterFactory.java:35)
    at org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.createCommitter(PathOutputCommitterFactory.java:201)
    at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.setupCommitter(PathOutputCommitProtocol.scala:88)
    at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.setupCommitter(PathOutputCommitProtocol.scala:49)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:177)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

这听起来像是一个绑定问题,但我无法立即看出它在哪里。一目了然,您拥有所有正确的设置。

检查委员会是否正在使用 S3 的最简单方法是查看 _SUCCESS 文件。如果它是 JSON 的一部分,那么使用了一个新的提交者......然后里面的文本会告诉你更多关于提交者的信息。

一个 0 字节的文件意味着仍然使用经典文件输出提交器

谜底已解 - 初始化魔法提交器失败是由于提交器工厂方案设置与实际目的地方案不匹配 URL。考虑一下:

committer 工厂配置是使用密钥设置的:spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a - 这意味着该设置是针对 s3a 协议 URLs.

虽然发送到 write 方法的 URL 是:s3://some-bucket/some-folder - 使用 s3 协议而不是 s3a。

PathOutputCommitterFactory hadoop class 搜索具有模式 mapreduce.outputcommitter.factory.scheme.%s 的配置键以识别用于给定输出 URL 的工厂。如果配置键中设置的模式(在本例中为 s3a)与目标中的模式不匹配 URL(在本例中为 s3)- 提交者出厂设置将不会已识别,工厂类型将退回到 FileOutputCommitter.

解决方案 - 确保 outputcommitter.factory.scheme.<protocol> 设置与目标 URL 中的协议匹配。我已经在 URL & 配置键中使用 s3s3a 成功测试。