提交 Flink 作业时不兼容 class 版本
Incompatible class versions when submitting Flink a job
我正在尝试使用 Beam 2.27/Flink 1.12 提交流作业,使用以下 Maven 命令行:
mvn exec:java -Dexec.mainClass=org.example.MyPipelineClass -Pflink-runner -Dexec.args="--runner=FlinkRunner --flinkMaster=flink-host:8081 --filesToStage=target/pipelines-bundled-1.0.0.jar"
事情进展顺利了一点,但我最近进行了一些编辑,想 运行 新版本的管道,现在出现以下错误:
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local class incompatible: stream classdesc serialVersionUID = -3137689219135046939, local class serialVersionUID = 3698633776553163849
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
at org.apache.flink.streaming.api.graph.StreamConfig.getInputs(StreamConfig.java:263)
... 21 more
我的 pom.xml 看起来像这样:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>pipelines</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<beam.version>2.27.0</beam.version>
<flink.version>1.12</flink.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>flink-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-${flink.version}</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- note that the pipeline runs on GCP -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- main output is JDBC/Postgres -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.4</version>
</dependency>
</dependencies>
</project>
当然,在提交作业之前,我使用 mvn clean package -Pflink-runner
.
构建了 JAR
Flink 在 GKE 实例上自托管,并使用 flink:1.12.4-java8
映像部署。
我发现 this issue 似乎有相同的错误信息,但没有明确的违规者。
任何帮助或探索的想法将不胜感激,我不知道要调查什么。
我发现这个问题属于我自己。在上面的pom.xml
中,我指定Beam runner是beam-runners-flink-1.12
,集群是运行 Flink 1.12.4
Beam的Flink runner未指定补丁,使用Flink 1.12.0
降级集群解决了我的问题。
我正在尝试使用 Beam 2.27/Flink 1.12 提交流作业,使用以下 Maven 命令行:
mvn exec:java -Dexec.mainClass=org.example.MyPipelineClass -Pflink-runner -Dexec.args="--runner=FlinkRunner --flinkMaster=flink-host:8081 --filesToStage=target/pipelines-bundled-1.0.0.jar"
事情进展顺利了一点,但我最近进行了一些编辑,想 运行 新版本的管道,现在出现以下错误:
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local class incompatible: stream classdesc serialVersionUID = -3137689219135046939, local class serialVersionUID = 3698633776553163849
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
at org.apache.flink.streaming.api.graph.StreamConfig.getInputs(StreamConfig.java:263)
... 21 more
我的 pom.xml 看起来像这样:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>pipelines</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<beam.version>2.27.0</beam.version>
<flink.version>1.12</flink.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>flink-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-${flink.version}</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- note that the pipeline runs on GCP -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- main output is JDBC/Postgres -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.4</version>
</dependency>
</dependencies>
</project>
当然,在提交作业之前,我使用 mvn clean package -Pflink-runner
.
Flink 在 GKE 实例上自托管,并使用 flink:1.12.4-java8
映像部署。
我发现 this issue 似乎有相同的错误信息,但没有明确的违规者。
任何帮助或探索的想法将不胜感激,我不知道要调查什么。
我发现这个问题属于我自己。在上面的pom.xml
中,我指定Beam runner是beam-runners-flink-1.12
,集群是运行 Flink 1.12.4
Beam的Flink runner未指定补丁,使用Flink 1.12.0
降级集群解决了我的问题。