运行 kafka 上的 spark-streaming 作业时发生 NoSuchMethodError

NoSuchMethodError occurs when run spark-streaming job on kafka

我正在使用 spark-streaming 来消费来自 kafka 的 protobuf 格式的消息。

master 设置为 "local[2]" 时工作正常,但是当我将 master url 更改为真正的 spark 集群的 master url 时,我遇到以下异常

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 4 times, most recent failure: Lost task 0.3 in stage 20.0 (TID 58, 10.0.5.155): java.lang.NoSuchMethodError: com.google.protobuf.CodedInputStream.readStringRequireUtf8()Ljava/lang/String;
    at cn.xiaoman.eagleeye.Agent$Tag.<init>(Agent.java:83)
    at cn.xiaoman.eagleeye.Agent$Tag.<init>(Agent.java:44)
    at cn.xiaoman.eagleeye.Agent$Tag.parsePartialFrom(Agent.java:638)
    at cn.xiaoman.eagleeye.Agent$Tag.parsePartialFrom(Agent.java:633)
    at com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309)
    at cn.xiaoman.eagleeye.Agent$Metric.<init>(Agent.java:797)
    at cn.xiaoman.eagleeye.Agent$Metric.<init>(Agent.java:718)
    at cn.xiaoman.eagleeye.Agent$Metric.parsePartialFrom(Agent.java:1754)
    at cn.xiaoman.eagleeye.Agent$Metric.parsePartialFrom(Agent.java:1749)
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at cn.xiaoman.eagleeye.Agent$Metric.parseFrom(Agent.java:1058)
    at cn.xiaoman.eagleeye.rtmetricprocessor.MetricDeserializer.deserialize(MetricDeserializer.java:25)
    at cn.xiaoman.eagleeye.rtmetricprocessor.MetricDeserializer.deserialize(MetricDeserializer.java:14)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:627)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:548)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:462)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:461)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

版本: 火花:2.11-2.0.2 卡夫卡:2.11-0.10.1.0 协议缓冲区:3.0.2

因为任务依赖protobuf 3,而spark运行时依赖另一个protobuf版本。

解决方案:编辑 build.gradle 以使用 'com.github.johnrengelman.shadow' 插件将 com.google.protobuf 重定位到另一个名称。

shadowJar {
    relocate 'com.google.protobuf', 'shadow.google.protobuf'
}

编辑:添加完整版build.gradle

group 'xxx'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'application'
apply plugin: 'com.google.protobuf'
apply plugin: 'com.github.johnrengelman.shadow'
sourceCompatibility = 1.8

buildscript {
    repositories {
        mavenLocal()
        mavenCentral()
        jcenter()
    }
    dependencies {
        // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier
        // gradle versions
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
        classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3'
    }
}


def grpcVersion = '1.0.2'

repositories {
    mavenLocal()
    mavenCentral()
    jcenter()
}

def sparkVersion = '2.0.2'
dependencies {
    compile "org.apache.spark:spark-streaming_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-streaming-kafka-0-10_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-core_2.11:${sparkVersion}"
    compile 'com.google.protobuf:protobuf-java:3.1.0'

    compile group: 'org.mongodb', name: 'mongo-java-driver', version: '3.4.0'

    testCompile 'junit:junit:4.11'
}

protobuf {
    protoc {
        // The version of protoc must match protobuf-java. If you don't depend on
        // protobuf-java directly, you will be transitively depending on the
        // protobuf-java version that grpc depends on.
        artifact = 'com.google.protobuf:protoc:3.0.2'
    }
//    plugins {
//        grpc {
//            artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
//        }
//    }
//    generateProtoTasks {
//        all()*.plugins {
//            grpc {
//                // To generate deprecated interfaces and static bindService method,
//                // turn the enable_deprecated option to true below:
//                option 'enable_deprecated=false'
//            }
//        }
//    }
}

idea {
    module {
        // Not using generatedSourceDirs because of
        // https://discuss.gradle.org/t/support-for-intellij-2016/15294/8
        sourceDirs += file("${projectDir}/build/generated/source/proto/main/java");
    }
}

 shadowJar {
    zip64 true
    relocate 'com.google.protobuf', 'shadow.google.protobuf'
}

mainClassName = "xxx.Main"