提交 Flink 作业时不兼容 class 版本

Incompatible class versions when submitting Flink a job

我正在尝试使用 Beam 2.27/Flink 1.12 提交流作业,使用以下 Maven 命令行:

mvn exec:java -Dexec.mainClass=org.example.MyPipelineClass -Pflink-runner -Dexec.args="--runner=FlinkRunner --flinkMaster=flink-host:8081 --filesToStage=target/pipelines-bundled-1.0.0.jar"

事情进展顺利了一点,但我最近进行了一些编辑,想 运行 新版本的管道,现在出现以下错误:

Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local class incompatible: stream classdesc serialVersionUID = -3137689219135046939, local class serialVersionUID = 3698633776553163849
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
    at org.apache.flink.streaming.api.graph.StreamConfig.getInputs(StreamConfig.java:263)
    ... 21 more

我的 pom.xml 看起来像这样:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>pipelines</artifactId>
  <version>1.0.0</version>

  <packaging>jar</packaging>

  <properties>
      <beam.version>2.27.0</beam.version>
      <flink.version>1.12</flink.version>
      <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
      <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
      <maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
      <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
  </properties>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>${maven-jar-plugin.version}</version>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>${maven-shade-plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/LICENSE</exclude>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <profiles>
    <profile>
      <id>flink-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-flink-${flink.version}</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
  </profiles>

  <dependencies>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- note that the pipeline runs on GCP -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
      <exclusions>
        <exclusion>
          <groupId>com.google.cloud.bigtable</groupId>
          <artifactId>bigtable-client-core</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <!-- main output is JDBC/Postgres -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-jdbc</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.2.4</version>
    </dependency>
  </dependencies>
</project>

当然,在提交作业之前,我使用 mvn clean package -Pflink-runner.

构建了 JAR

Flink 在 GKE 实例上自托管,并使用 flink:1.12.4-java8 映像部署。

我发现 this issue 似乎有相同的错误信息,但没有明确的违规者。

任何帮助或探索的想法将不胜感激,我不知道要调查什么。

我发现这个问题属于我自己。在上面的pom.xml中,我指定Beam runner是beam-runners-flink-1.12,集群是运行 Flink 1.12.4

Beam的Flink runner未指定补丁,使用Flink 1.12.0

降级集群解决了我的问题。