Flink Avro 1.8.1 NoSuchMethodError when 运行 on cluster

Flink Avro 1.8.1 NoSuchMethodError when running on cluster

我们运行 Flink 1.3.0 CEP作业并依赖Avro 1.8.1(LogicalType在Avro 1.7.7中不存在)序列化复杂事件(作为 POJO)。它在 IDE (IntelliJ) 中 运行 时工作,但是当我们打包 jar 文件并将其部署到集群时,我们得到:

java.lang.NoSuchMethodError: org.apache.avro.Schema.setLogicalType(Lorg/apache/avro/LogicalType;)V
at org.apache.avro.LogicalType.addToSchema(LogicalType.java:72)

但是,关于构建输出,我们似乎构建了包含正确 Avro 包版本 (1.8.1) 的 jar。

mvn clean package -Pbuild-jar
...
[INFO] Including org.apache.avro:avro:jar:1.8.1 in the shaded jar.
...

问题:如何保证我们的Flink集群使用正确的Avro版本(1.8.1)?

我们的pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.group.flink</groupId>
<artifactId>cep</artifactId>
<version>0.2</version>
<packaging>jar</packaging>

<name>Complex Event Processing</name>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.3.0</flink.version>
    <slf4j.version>1.7.7</slf4j.version>
    <log4j.version>1.2.17</log4j.version>
    <kafka.version>0.10.2.1</kafka.version>
    <scala.version>2.11</scala.version>
    <avro.version>1.8.1</avro.version>
    <junit.version>5.0.0-M4</junit.version>
</properties>

<repositories>
    <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
            <enabled>false</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>jitpack.io</id>
        <url>https://jitpack.io</url>
    </repository>
</repositories>

<!-- 

    Execute "mvn clean package -Pbuild-jar"
    to build a jar file out of this project!

    How to use the Flink Quickstart pom:

    a) Adding new dependencies:
        You can add dependencies to the list below.
        Please check if the maven-shade-plugin below is filtering out your dependency
        and remove the exclude from there.

    b) Build a jar for running on the cluster:
        There are two options for creating a jar from this project

        b.1) "mvn clean package" -> this will create a fat jar which contains all
                dependencies necessary for running the jar created by this pom in a cluster.
                The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.

        b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
                nicer dependency exclusion handling. This approach is preferred and leads to
                much cleaner jar files.
-->

<dependencies>
    <!-- Apache Flink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- explicitly add a standard loggin framework, as Flink does not (in the future) have
        a hard dependency on one specific framework by default -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_${scala.version}</artifactId>
        <version>${kafka.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>${avro.version}</version>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>${junit.version}</version>
    </dependency>
    <dependency>
        <groupId>com.github.smueller18</groupId>
        <artifactId>avro-builder</artifactId>
        <version>0.5</version>
    </dependency>
    <dependency>
        <groupId>com.github.smueller18</groupId>
        <artifactId>flink-serialization</artifactId>
        <version>0.6</version>
    </dependency>
    <dependency>
        <groupId>joda-time</groupId>
        <artifactId>joda-time</artifactId>
        <version>2.9.9</version>
    </dependency>


</dependencies>

<profiles>
    <profile>
        <!-- Profile for packaging correct JAR files -->
        <id>build-jar</id>

        <activation>
            <activeByDefault>false</activeByDefault>
        </activation>

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>

        <build>
            <plugins>
                <!-- disable the exclusion rules -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.1</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes combine.self="override"></excludes>
                                </artifactSet>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </profile>
</profiles>

<build>
    <plugins>
        <!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
        except flink and it's transitive dependencies. The resulting fat-jar can be executed
        on a cluster. Change the value of Program-Class if your program entry point changes. -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.1</version>
            <executions>
                <!-- Run shade goal on package phase -->
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <!-- This list contains all dependencies of flink-dist
                                Everything else will be packaged into the fat-jar
                                -->
                                <exclude>org.apache.flink:flink-annotations</exclude>
                                <exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
                                <exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
                                <exclude>org.apache.flink:flink-core</exclude>
                                <exclude>org.apache.flink:flink-java</exclude>
                                <exclude>org.apache.flink:flink-scala_2.11</exclude>
                                <exclude>org.apache.flink:flink-runtime_2.11</exclude>
                                <exclude>org.apache.flink:flink-optimizer_2.11</exclude>
                                <exclude>org.apache.flink:flink-clients_2.11</exclude>
                                <exclude>org.apache.flink:flink-avro_2.11</exclude>
                                <exclude>org.apache.flink:flink-examples-batch_2.11</exclude>
                                <exclude>org.apache.flink:flink-examples-streaming_2.11</exclude>
                                <exclude>org.apache.flink:flink-streaming-java_2.11</exclude>
                                <exclude>org.apache.flink:flink-streaming-scala_2.11</exclude>
                                <exclude>org.apache.flink:flink-scala-shell_2.11</exclude>
                                <exclude>org.apache.flink:flink-python</exclude>
                                <exclude>org.apache.flink:flink-metrics-core</exclude>
                                <exclude>org.apache.flink:flink-metrics-jmx</exclude>
                                <exclude>org.apache.flink:flink-statebackend-rocksdb_2.11</exclude>

                                <!-- Also exclude very big transitive dependencies of Flink

                                WARNING: You have to remove these excludes if your code relies on other
                                versions of these dependencies.

                                -->

                                <exclude>log4j:log4j</exclude>
                                <exclude>org.scala-lang:scala-library</exclude>
                                <exclude>org.scala-lang:scala-compiler</exclude>
                                <exclude>org.scala-lang:scala-reflect</exclude>
                                <exclude>com.data-artisans:flakka-actor_*</exclude>
                                <exclude>com.data-artisans:flakka-remote_*</exclude>
                                <exclude>com.data-artisans:flakka-slf4j_*</exclude>
                                <exclude>io.netty:netty-all</exclude>
                                <exclude>io.netty:netty</exclude>
                                <exclude>commons-fileupload:commons-fileupload</exclude>
                                <!--<exclude>org.apache.avro:avro</exclude>-->
                                <exclude>commons-collections:commons-collections</exclude>
                                <exclude>org.codehaus.jackson:jackson-core-asl</exclude>
                                <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
                                <exclude>com.thoughtworks.paranamer:paranamer</exclude>
                                <exclude>org.xerial.snappy:snappy-java</exclude>
                                <exclude>org.apache.commons:commons-compress</exclude>
                                <exclude>org.tukaani:xz</exclude>
                                <exclude>com.esotericsoftware.kryo:kryo</exclude>
                                <exclude>com.esotericsoftware.minlog:minlog</exclude>
                                <exclude>org.objenesis:objenesis</exclude>
                                <exclude>com.twitter:chill_*</exclude>
                                <exclude>com.twitter:chill-java</exclude>
                                <exclude>commons-lang:commons-lang</exclude>
                                <exclude>junit:junit</exclude>
                                <exclude>org.apache.commons:commons-lang3</exclude>
                                <exclude>org.slf4j:slf4j-api</exclude>
                                <exclude>org.slf4j:slf4j-log4j12</exclude>
                                <exclude>log4j:log4j</exclude>
                                <exclude>org.apache.commons:commons-math</exclude>
                                <exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
                                <exclude>commons-logging:commons-logging</exclude>
                                <exclude>commons-codec:commons-codec</exclude>
                                <exclude>com.fasterxml.jackson.core:jackson-core</exclude>
                                <exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
                                <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
                                <exclude>stax:stax-api</exclude>
                                <exclude>com.typesafe:config</exclude>
                                <exclude>org.uncommons.maths:uncommons-maths</exclude>
                                <exclude>com.github.scopt:scopt_*</exclude>
                                <exclude>commons-io:commons-io</exclude>
                                <exclude>commons-cli:commons-cli</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>org.apache.flink:*</artifact>
                                <excludes>
                                    <!-- exclude shaded google but include shaded curator -->
                                    <exclude>org/apache/flink/shaded/com/**</exclude>
                                    <exclude>web-docs/**</exclude>
                                </excludes>
                            </filter>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <!-- If you want to use ./bin/flink run <quickstart jar> uncomment the following lines.
                        This will add a Main-Class entry to the manifest file -->
                        <!--
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>de.hska.stcs.cep.StreamingJob</mainClass>
                            </transformer>
                        </transformers>
                        -->
                        <createDependencyReducedPom>false</createDependencyReducedPom>
                    </configuration>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source> <!-- If you want to use Java 8, change this to "1.8" -->
                <target>1.8</target> <!-- If you want to use Java 8, change this to "1.8" -->
            </configuration>
        </plugin>
    </plugins>


    <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
    <!--
    <pluginManagement>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <compilerId>jdt</compilerId>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.eclipse.tycho</groupId>
                        <artifactId>tycho-compiler-jdt</artifactId>
                        <version>0.21.0</version>
                    </dependency>
                </dependencies>
            </plugin>

            <plugin>
                <groupId>org.eclipse.m2e</groupId>
                <artifactId>lifecycle-mapping</artifactId>
                <version>1.0.0</version>
                <configuration>
                    <lifecycleMappingMetadata>
                        <pluginExecutions>
                            <pluginExecution>
                                <pluginExecutionFilter>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-assembly-plugin</artifactId>
                                    <versionRange>[2.4,)</versionRange>
                                    <goals>
                                        <goal>single</goal>
                                    </goals>
                                </pluginExecutionFilter>
                                <action>
                                    <ignore/>
                                </action>
                            </pluginExecution>
                            <pluginExecution>
                                <pluginExecutionFilter>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <versionRange>[3.1,)</versionRange>
                                    <goals>
                                        <goal>testCompile</goal>
                                        <goal>compile</goal>
                                    </goals>
                                </pluginExecutionFilter>
                                <action>
                                    <ignore/>
                                </action>
                            </pluginExecution>
                        </pluginExecutions>
                    </lifecycleMappingMetadata>
                </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
    -->
</build>

UPDATE:我从源代码构建了 flink 1.3,并在项目的 pom.xml 中更新了 avro 版本(从 1.7.7 到 1.8.1),它似乎现在工作。仍然不确定为什么它在构建 fat jar 时不起作用。

尝试使用最新版本的avro

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>

请尝试将 avro jar 放在 /lib 文件夹中。