Apache Beam Java SDK SparkRunner 写入镶木地板错误

Apache Beam Java SDK SparkRunner write to parquet error

我在 Java 中使用 Apache Beam。 我正在尝试使用本地模式在预部署的 Spark env 上使用 SparkRunner 读取一个 csv 文件并将其写入 parquet 格式。 DirectRunner 一切正常,但 SparkRunner 根本无法工作。 我正在使用 maven shade 插件构建一个 fat jat。

代码如下:

Java:

public class ImportCSVToParquet{
-- ommitted
                File csv = new File(filePath);
                PCollection<String> vals = pipeline.apply(TextIO.read().from(filePath));

                String parquetFilename = csv.getName().replaceFirst("csv", "parquet");
                String outputLocation = FolderConventions.getRawFilePath(confETL.getHdfsRoot(), parquetFilename);

                PCollection<GenericRecord> processed = vals.apply(ParDo.of(new ProcessFiles.GenericRecordFromCsvFn()))
                        .setCoder(AvroCoder.of(new Config().getTransactionSchema()));

                LOG.info("Processed file will be written to: " + outputLocation);
                processed.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(conf.getTransactionSchema())).to(outputLocation));


        pipeline.run().waitUntilFinish();


}

POM 依赖项:

<dependencies>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-spark</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-parquet</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
/dependencies>

Spark 脚本:

spark-submit \
--class package.ImportCSVToParquet \
--master local[*] \
--executor-cores 2 \
--executor-memory 2g \
--driver-memory 2g \
--driver-cores 2 \
--conf spark.sql.codegen.wholeStage=false \
--conf spark.wholeStage.codegen=false \
--conf spark.sql.shuffle.partitions=2005 \
--conf spark.driver.maxResultSize=2g \
--conf spark.executor.memoryOverhead=4048 \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" \
--conf "spark.driver.extraJavaOptions=-Djava.io.tmpdir=/path-to-tmp/" \
--conf "spark.driver.extraClassPath=./" \
--jars path-to-jar \
/path-to-jar "$@"

我收到以下错误:

2019-08-07 13:37:49 ERROR Executor:91 - Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
       at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:176)
        at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1038)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
        at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
        at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.open(ParquetIO.java:304)
        at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.prepareWrite(FileIO.java:1359)
        at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:937)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn.processElement(WriteFiles.java:533)

作业似乎进行了读取和转换,但在尝试写入文件系统时失败了。我目前没有使用 HDFS。有什么想法吗?

我确定 ParquetIO 依赖于 Parquet 1.10+ 版本,它向 parquet 文件 readers/writers 添加了 "hadoop-neutral" API。

Spark 2.2.3 depends on Parquet 1.8.2,它没有 Beam ParquetIO 使用的 builder(...) 构造函数,异常证实了这一点。

如果可能,最简单的解决方案是更新到 Spark 2.4,它将 Parquet 版本提升到 1.10.0。

如果您无法升级 Spark 版本,有几种方法可以覆盖 Spark 引入的 jar:

  1. 您可以将 spark.(driver|executor).userClassPathFirst 设置为 true,这会将 类 放在您的 fat jar 中,位于 spark 提供的 jar 之前。这可能有效,也可能引入新的依赖冲突。

  2. 您可以尝试将本地 spark 安装中的 parquet-xx-1.8.2.jar 替换为 parquet-xx-1.10.0(假设它们是直接替换)。如果可行,您可以通过在提交作业时设置 spark.yarn.jars 属性 将相同的策略应用于集群中的 spark 作业。

  3. 您可以尝试在您的 fat jar 中对 beam ParquetIO 及其 parquet 依赖项进行着色。

编辑:这是一个已知问题BEAM-5164

编辑(解决方法)

我按照 instructions 进行了一些修改,设法让它适用于 Spark 2.2.3:

  • 我使用了 scala 2.11 依赖项并将它们设置为 <scope>provided</scope>(可能是可选的)。

  • 我在maven-shade-plugin中添加了以下三个位置:

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <configuration>
          <createDependencyReducedPom>false</createDependencyReducedPom>
          <filters>

... unchanged ...

          </filters>
          <relocations>
            <relocation>
              <pattern>org.apache.parquet</pattern>
              <shadedPattern>shaded.org.apache.parquet</shadedPattern>
            </relocation>
            <!-- Some packages are shaded already, and on the original spark classpath. Shade them more. -->
            <relocation>
              <pattern>shaded.parquet</pattern>
              <shadedPattern>reshaded.parquet</shadedPattern>
            </relocation>
            <relocation>
              <pattern>org.apache.avro</pattern>
              <shadedPattern>shaded.org.apache.avro</shadedPattern>
            </relocation>
          </relocations>
        </configuration>
        <executions>

... unchanged ...

        </executions>
      </plugin>
    </plugins>
  </build>

请勿使用 spark.driver.userClassPathFirstspark.executor.userClassPathFirst,因为它仍处于实验阶段。但相反,使用 spark.driver.extraClassPathspark.executor.extraClassPath.

来自官方的定义documentation:"Extra classpath entries to prepend to the classpath of the driver."

  • "prepend",如,放在 Spark 的核心类路径前面。

示例:

--conf spark.driver.extraClassPath=C:\Users\Khalid\Documents\Projects\libs\jackson-annotations-2.6.0.jar;C:\Users\Khalid\Documents\Projects\libs\jackson-core-2.6.0.jar;C:\Users\Khalid\Documents\Projects\libs\jackson-databind-2.6.0.jar

这解决了我的问题(我想使用的 Jackson 版本与正在使用的 one spark 之间存在冲突)。

希望对您有所帮助。