我正在尝试使用beam 2.27/flink 1.12提交流媒体作业,使用以下maven命令行:
mvn exec:java -Dexec.mainClass=org.example.MyPipelineClass -Pflink-runner -Dexec.args="--runner=FlinkRunner --flinkMaster=flink-host:8081 --filesToStage=target/pipelines-bundled-1.0.0.jar"
有一段时间一切都很顺利,但我最近做了一些编辑,希望运行管道的新版本,现在出现以下错误:
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local class incompatible: stream classdesc serialVersionUID = -3137689219135046939, local class serialVersionUID = 3698633776553163849
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
at org.apache.flink.streaming.api.graph.StreamConfig.getInputs(StreamConfig.java:263)
... 21 more
my pom.xml的外观如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>pipelines</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<beam.version>2.27.0</beam.version>
<flink.version>1.12</flink.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>flink-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-${flink.version}</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- note that the pipeline runs on GCP -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- main output is JDBC/Postgres -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.4</version>
</dependency>
</dependencies>
</project>
当然,在提交作业之前,我使用 mvn clean package -Pflink-runner
.
flink自托管在gke示例上,并使用 flink:1.12.4-java8
形象。
我发现这个问题似乎有相同的错误信息,但没有明确的违规者。
如果有任何需要探索的帮助或想法,我将不胜感激,我不知道该调查什么。
1条答案
按热度按时间bttbmeg01#
我发现这是我自己的问题。在
pom.xml
在上面,我指定梁流道为beam-runners-flink-1.12
,群集运行的是flink 1.12.4梁的flink runner未指定面片,而是使用flink 1.12.0
降级集群解决了我的问题。