net.jpounz.lz4 使用 spark streaming 从 kafka 读取时出现异常

net.jpounz.lz4 exception when reading from kafka with spark streaming

我使用 spark 2.4.0 python。并从 kafka_2.11-2.0.0 (二进制而非源代码) 中读取数据。我正在使用 spark-submit --jars sspark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar script.py 错误报告中出现一条错误消息,如果有人可以提供帮助,谢谢 :)

19/03/25 13:48:53 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
    at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
    at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
    at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun.apply(BlockStoreShuffleReader.scala:50)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun.apply(BlockStoreShuffleReader.scala:50)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:453)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
    at scala.collection.Iterator$$anon.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon.writeIteratorToStream(PythonRunner.scala:557)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
Exception in thread "stdout writer for python" java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
    at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
    at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
    at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun.apply(BlockStoreShuffleReader.scala:50)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun.apply(BlockStoreShuffleReader.scala:50)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:453)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
    at scala.collection.Iterator$$anon.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon.writeIteratorToStream(PythonRunner.scala:557)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

jar 文件的 pom.xml : spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar :

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-parent_2.11</artifactId>
    <version>2.4.0</version>
    <relativePath>../../pom.xml</relativePath>
  </parent>

  <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId>
  <packaging>jar</packaging>
  <name>Spark Project External Kafka Assembly</name>
  <url>http://spark.apache.org/</url>

  <properties>
    <sbt.project.name>streaming-kafka-0-8-assembly</sbt.project.name>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
      <version>${project.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
      <version>${project.version}</version>
      <scope>provided</scope>
    </dependency>
    <!--
      Demote already included in the Spark assembly.
    -->
    <dependency>
      <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.lz4</groupId>
      <artifactId>lz4-java</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-mapred</artifactId>
      <classifier>${avro.mapred.classifier}</classifier>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.xerial.snappy</groupId>
      <artifactId>snappy-java</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
  </dependencies>

  <build>
  <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
  <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <configuration>
        <shadedArtifactAttached>false</shadedArtifactAttached>
        <artifactSet>
          <includes>
            <include>*:*</include>
          </includes>
        </artifactSet>
        <filters>
          <filter>
            <artifact>*:*</artifact>
            <excludes>
              <exclude>META-INF/*.SF</exclude>
              <exclude>META-INF/*.DSA</exclude>
              <exclude>META-INF/*.RSA</exclude>
            </excludes>
          </filter>
        </filters>
      </configuration>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <transformers>
              <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                <resource>reference.conf</resource>
              </transformer>
              <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
                <resource>log4j.properties</resource>
              </transformer>
              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
            </transformers>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>
</project>

我已经解决了这个问题,我正在使用以下命令执行我的代码:

bin/spark-submit --jars external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar Path/kafka_test.py localhost:2181 test

当我运行它用这个命令工作时:

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0 Path/kafka_test.py localhost:2181 test