java在本地执行flink示例程序

iq0todco  于 2021-06-24  发布在  Flink
关注(0)|答案(3)|浏览(441)

我试图在本地模式下用apache flink执行一个示例程序。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");
        //DataSet<String> text1 = env.readTextFile(args[0]);

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
        env.execute();

        env.execute("Word Count Example");
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

这给了我一个例外:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/InputFormat
    at WordCountExample.main(WordCountExample.java:10)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.InputFormat
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 1 more

我做错什么了?
我也用了正确的jar。flink-java-0.9.0-milestone-1.jar flink-clients-0.9.0-milestone-1.jar flink-core-0.9.0-milestone-1.jar

hi3rlvi2

hi3rlvi21#

首先,您在项目中包含的flink jar文件不够,请包含flink源文件夹下的lib文件夹中的所有jar文件。
其次,“env.execute();env.execute(“字数示例”);“这些代码行不是必需的,因为您只是将数据集打印到控制台上;您没有将输出写入文件(.txt、.csv等)。因此,最好删除这些行(如果不需要(观察了很多次),如果包含在代码中,有时会抛出错误)
**第三,**在从ide导出java项目的jar文件时,不要忘记选择“main”类。

希望在进行上述更改之后,您的代码能够正常工作。

omqzjyyz

omqzjyyz2#

将这三个flinkjar文件作为依赖项添加到项目中是不够的,因为它们还有其他可传递的依赖项,例如在hadoop上。
获得开发(和本地执行)flink程序的工作设置的最简单方法是遵循quickstart指南,该指南使用maven原型来配置maven项目。这个maven项目可以导入到ide中。

rqmkfv5c

rqmkfv5c3#

noclassdeffounderror扩展linkageerror
如果java虚拟机或classloader示例尝试加载类的定义(作为普通方法调用的一部分或使用新表达式创建新示例的一部分),但找不到该类的定义,则引发。编译当前执行的类时,已存在搜索的类定义,但无法再找到该定义。
您的代码/jar依赖于hadoop。在这里找到它下载jar文件并将其添加到classpath org.apache.hadoop.mapreduce.inputformat中

相关问题