flink:java.lang.classnotfoundexception:org.apache.flink.api.common.serialization.deserializationschema

hkmswyz6  于 2021-06-01  发布在  Hadoop
关注(0)|答案(0)|浏览(838)

使用flink使用aws kinesis的数据。
我的代码:

public class kinesisConsumer {
public static void main(String[] args) throws Exception {
    Properties kinesisConsumerConfig = new Properties();
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "cn-northwest-1");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aaa");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "bbb");
    kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
            "test_data",
            new SimpleStringSchema(),
            kinesisConsumerConfig));

    kinesis.print();

    see.execute();
    }
}

我的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ne</groupId>
    <artifactId>kinesis</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>kinesisConsumer</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.5.2</version>
        </dependency>
    </dependencies>
</project>

我还下载并添加 flink-connector-kinesis_2.11-1.6.2.jar 进入图书馆。
然后我用以下方法 Package : mvn -DskipTests package . 但当我运行jar时,它返回:

Error: A JNI error has occurred, please check your installation and try again
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
            at java.lang.Class.getDeclaredMethods0(Native Method)
            at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
            at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
            at java.lang.Class.getMethod0(Class.java:3018)
            at java.lang.Class.getMethod(Class.java:1784)
            at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
            at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema
            at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
            ... 7 more

我的问题是:
为什么会出现这种错误?如何解决?
我正在消耗数据然后打印它,而不是将其保存到磁盘,为什么程序使用序列化代码?
感谢您的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题