升级 Spark 和 Scala 版本后使用 bulkput API 写入 BigTable 时出现问题

Getting Issue while writing to BigTable using bulkput API after upgrading Spark and Scala Version

我正在使用 JavaHBaseContext bulkput API 写入 BigTable。这适用于以下 spark 和 scala 版本

<spark.version>2.3.4</spark.version>
<scala.version>2.11.8</scala.version>

我们刚刚用Spark 2.4(1.5.23-debian10镜像版本)创建了一个新的dataproc集群 我升级了 spark 和 scala 依赖项以使用以下 spark 和 scala 版本并开始出现如下所列的问题。

<spark.version>2.4.7</spark.version>
<scala.version>2.12.10</scala.version>

java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.spark.HBaseConnectionCache$
        at org.apache.hadoop.hbase.spark.HBaseContext.org$apache$hadoop$hbase$spark$HBaseContext$$hbaseForeachPartition(HBaseContext.scala:488)
        at org.apache.hadoop.hbase.spark.HBaseContext$$anonfun$bulkPut.apply(HBaseContext.scala:225)
        at org.apache.hadoop.hbase.spark.HBaseContext$$anonfun$bulkPut.apply(HBaseContext.scala:225)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition(RDD.scala:980)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$adapted(RDD.scala:980)
        at org.apache.spark.SparkContext.$anonfun$runJob(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

20/11/17 14:49:48 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 2.0 in stage 12.0, java.lang.NoClassDefFoundError: scala/Product$class
        at org.apache.hadoop.hbase.spark.HBaseConnectionCacheStat.<init>(HBaseConnectionCache.scala:261)
        at org.apache.hadoop.hbase.spark.HBaseConnectionCache$.<init>(HBaseConnectionCache.scala:37)
        at org.apache.hadoop.hbase.spark.HBaseConnectionCache$.<clinit>(HBaseConnectionCache.scala)
        at org.apache.hadoop.hbase.spark.HBaseContext.org$apache$hadoop$hbase$spark$HBaseContext$$hbaseForeachPartition(HBaseContext.scala:488)
        at org.apache.hadoop.hbase.spark.HBaseContext$$anonfun$bulkPut.apply(HBaseContext.scala:225)
        at org.apache.hadoop.hbase.spark.HBaseContext$$anonfun$bulkPut.apply(HBaseContext.scala:225)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition(RDD.scala:980)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$adapted(RDD.scala:980)
        at org.apache.spark.SparkContext.$anonfun$runJob(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 17 more

Pom.xml

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spark.version>2.4.7</spark.version>
        <scala.version>2.12.10</scala.version>
    </properties>

    <distributionManagement>
        <snapshotRepository>
            <id>sonatype-nexus-snapshots</id>
            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
        </snapshotRepository>
        <repository>
            <id>sonatype-nexus-staging</id>
            <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
        </repository>
    </distributionManagement>

    <dependencies>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-shared-dependencies</artifactId>
            <version>0.13.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-bigquery</artifactId>
            <version>1.116.10</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava-jdk5</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>failureaccess</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>listenablefuture</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.spark</groupId>
            <artifactId>spark-bigquery-with-dependencies_2.12</artifactId>
            <version>0.17.3</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava-jdk5</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>failureaccess</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>listenablefuture</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-compiler</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-reflect</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <scope>provided</scope>
        </dependency>
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
           <exclusions>
               <exclusion>
                   <groupId>org.scala-lang</groupId>
                   <artifactId>scala-library</artifactId>
               </exclusion>
           </exclusions>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-compiler</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-reflect</artifactId>
                </exclusion>
            </exclusions>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.0-jre</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
            </exclusions>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
            </exclusions>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>1.113.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>failureaccess</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>listenablefuture</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>
       <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>2.0.2.3.1.0.0-78</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>failureaccess</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>listenablefuture</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.bigtable</groupId>
            <artifactId>bigtable-hbase-2.x-hadoop</artifactId>
            <version>1.16.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>failureaccess</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>listenablefuture</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.4</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <!-- Maven Shade Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <finalName>${project.artifactId}-${project.version}</finalName>
                            <relocations>
                                <relocation>
                                    <pattern>com.google.protobuf</pattern>
                                    <shadedPattern>shaded.com.google.protobuf</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.common</pattern>
                                    <shadedPattern>shaded.com.google.common</shadedPattern>
                                </relocation>
                            </relocations>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.jacoco</groupId>
                <artifactId>jacoco-maven-plugin</artifactId>
                <version>0.8.5</version>
                <executions>
                    <execution>
                        <id>prepare-agent</id>
                        <goals>
                            <goal>prepare-agent</goal>
                        </goals>
                        <configuration>
                            <destFile>target/ut-coverage.exec</destFile>
                        </configuration>
                    </execution>
                    <execution>
                        <id>report</id>
                        <phase>verify</phase>
                        <goals>
                            <goal>report</goal>
                        </goals>
                        <configuration>
                            <dataFile>target/ut-coverage.exec</dataFile>
                            <outputDirectory>target/jacoco-ut</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

在我看来它无法实例化 HBaseConnectionCache class 因为它无法找到 scala/Product$class 但是我已经从任何地方排除了 scala-library,现在它只是来自它自己的依赖。任何线索都会有所帮助。

似乎异常与依赖关系有关 org.apache.hbase:hbase-spark:2.0.2.3.1.0.0-78:

java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.spark.HBaseConnectionCache$ at org.apache.hadoop.hbase.spark.HBaseContext.org$apache$hadoop$hbase$spark$HBaseContext$$hbaseForeachPartition(HBaseContext.scala:488) at org.apache.hadoop.hbase.spark.HBaseContext$$anonfun$bulkPut.apply(HBaseContext.scala:225) at org.apache.hadoop.hbase.spark.HBaseContext$$anonfun$bulkPut.apply(HBaseContext.scala:225)

the maven page 可以看出它是使用 Scala 2.11 构建的,这可能解释了它不适用于 Scala 2.12 附带的 Dataproc 1.5。

我认为您可以尝试使用 Spark 2.4 和 Scala 2.11.12 附带的 Dataproc 1.4,并相应地更新您的应用程序的依赖项。