gcs-connector 访问 google 存储的 Spark-submit 选项

Spark-submit options for gcs-connector to access google storage

我在 自我管理的 集群(如本地环境)上使用 spark-job,同时访问 google 存储上的存储桶。

❯ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.

如果我 运行 使用本地下载 gcs-connector 通过以下命令完成作业,它会成功完成。

spark-submit\
 --name CreateAllDataDFWithSpark\
 --jars ./gcs-connector-hadoop3-2.2.2.jar\
 --packages org.apache.spark:spark-avro_2.12:3.1.2\
 --conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\
 <path_to>/.cache/pypoetry/virtualenvs/poc-TZFypELR-py3.7/lib/python3.7/site-packages/luigi/contrib/pyspark_runner.py\
 /tmp/CreateAllDataDFWithSpark78itslb5/CreateAllDataDFWithSpark.pickle

另一方面,如果我 运行 作业没有预先下载 gcs-connector,如下所示,

spark-submit\
 --name CreateAllDataDFWithSpark\
 --packages org.apache.spark:spark-avro_2.12:3.1.2,com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2\
 --conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\
 --conf spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS\
 <path_to>/.cache/pypoetry/virtualenvs/poc-TZFypELR-py3.7/lib/python3.7/site-packages/luigi/contrib/pyspark_runner.py\
 /tmp/CreateAllDataDFWithSpark1gf54xue/CreateAllDataDFWithSpark.pickle

它给出了以下错误。

  ...
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3302)
    at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:377)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:307)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    ... 24 more
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;JJ)V
    at com.google.cloud.hadoop.gcsio.cooplock.CooperativeLockingOptions$Builder.build(CooperativeLockingOptions.java:58)
    at com.google.cloud.hadoop.gcsio.cooplock.CooperativeLockingOptions.<clinit>(CooperativeLockingOptions.java:33)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.<clinit>(GoogleHadoopFileSystemConfiguration.java:383)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.<init>(GoogleHadoopFileSystemBase.java:246)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.<init>(GoogleHadoopFileSystem.java:58)
    ... 29 more

我不明白为什么第二个命令不起作用。

我感谢任何建议或评论。 谢谢!

如评论中所述,这源于 GCS 连接器的依赖项与您在 Spark 发行版中捆绑的内容之间的 Guava 版本不兼容。具体来说,GCS connector hadoop3-2.2.2 depends on Guava 30.1-jre whereas Spark 3.1.2 brings Guava 14.0.1 as a "provided" dependency.

在这两个不同的命令中,类路径加载以正确的顺序发生,使您的第一种方法起作用,这或多或少是幸运的,并且当添加其他 jar 时,它可能会再次意外失败.

理想情况下,无论如何您都希望托管自己的 jarfile 以最大程度地减少对外部存储库(Maven 存储库)的运行时依赖性,因此预安装 jarfile 是正确的方法。当您这样做时,您应该考虑使用 full shaded jarfile (also available on Maven central) 而不是最小的 GCS 连接器 jarfile,以避免将来出现类加载版本问题。