使用kafka连接器运行flink时发生noclassdeffounderror

whhtz7ly  于 2021-06-25  发布在  Flink
关注(0)|答案(5)|浏览(475)

我正在尝试使用flink从Kafka传输数据。我的代码编译时没有错误,但运行时出现以下错误:

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: 
    org/apache/flink/streaming/util/serialization/DeserializationSchema
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

我的pom依赖列表如下:

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-core</artifactId>
            <version>0.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>0.10.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1</version>
        </dependency>  
    </dependencies>

我尝试运行的java代码只是订阅了一个名为“streamer”的kafka主题:

import java.util.Properties;
import java.util.Arrays;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

public class StreamConsumer {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "samplegroup");

        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>("streamer", new SimpleStringSchema(), properties));

        messageStream.rebalance().map(new MapFunction<String, String>() {
                        private static final long serialVersionUID = -6867736771747690202L;
                        @Override
                        public String map(String value) throws Exception {
                                return "Streamed data: " + value;
                        }
                }).print();
        env.execute();
}
}

系统信息:
1.Kafka版本:0.9.0.1
2.flink版本:1.3.2
3.openjdk版本:1.8
虽然我使用的是maven,但我不认为这是maven的问题,因为即使在没有maven的情况下尝试,我也会遇到同样的错误。我手动将所有必需的.jar文件下载到一个文件夹中,并在使用javac编译时使用-cp选项指定该文件夹路径。我在运行时得到与上面相同的错误,但在编译时没有错误。

m0rkklqb

m0rkklqb1#

如果在pom文件中的某个位置指定了作用域,请尝试删除作用域,因为它在运行时会限制类文件的作用域
在pom文件中限制作用域

polhcujo

polhcujo2#

我也有同样的问题。对我来说,我曾用

mvn clean package -Pbuild-jar
czfnxgou

czfnxgou3#

我弄明白了原因,现在看来这真是个愚蠢的错误。在我的例子中,jar包在运行时不可用。我最后一点也没用maven。我是用 javac -cp <path_to_jar_files> 又被处决了 java -cp <path_to_jar_files>

wpx232ag

wpx232ag4#

使用kafka连接器运行flink时出现noclassdeffounderror
你的代码编译了,并且你得到了noclassdeffounderror,我认为你的一个依赖库在maven.pom的自动下载过程中丢失了它的编译依赖项或运行时依赖项
所以这可能是你得noclassdeffounderror的根本原因
解决方案:清洁和建造

abithluo

abithluo5#

您的pom的第一个问题似乎是您对flink导入使用了不同的版本。尝试使用更新的版本 1.3.2 所有flink模块。当您使用不兼容或多个版本的库时,经常会发生此错误。
尝试使用衰减依赖项(假设您使用的是scala 2.11):

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>1.3.2</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1</version>
    </dependency>  
</dependencies>

如果你仍然有同样的问题后,示例代码,这样我可以重现错误。

相关问题