flink readcsvfile方法

hm2xizp9  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(276)

我的flink项目中有以下代码:

public class Test {

    public static void main(String[] args) throws Exception {

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Event> events =
            env.readCsvFile(args[0]).pojoType(
               Event.class,
               "time",
               "vid",
               "speed",
               "xWay",
               "lane",
               "dir",
               "seg",
               "pos"
            );

        System.out.println("----> " + events.count());
    }
}

这就是我们班 Event :

class Event {
    public int time;
    public int vid;
    public int speed;
    public int xWay;
    public int lane;
    public int dir;
    public int seg;
    public int pos;

    public Event() { }

    public Event(int time_in, int vid_in, int speed_in, int xWay_in, int lane_in, int dir_in, int seg_in, int pos_in) {
        this.time = time_in;
        this.vid = vid_in;
        this.speed = speed_in;
        this.xWay = xWay_in;
        this.lane = lane_in;
        this.dir = dir_in;
        this.seg = seg_in;
        this.pos = pos_in;
    }
}

项目已编译,但运行时出现错误:

java.lang.ClassCastException: org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.PojoTypeInfo

csv文件有8个整数值,每行用逗号分隔。
该文档有以下示例:

DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                     .pojoType(Person.class, "name", "age", "zipcode");

我不知道pojo的定义是不是错了,肯定是的。我用它实现了我想要的 map 以及 readTextFile 但这可能更贵。

yrwegjxp

yrwegjxp1#

这个 ClassCastException 是一个很快就会被修复并被一个更有意义的异常所取代的bug。 Event 是一个 GenericType 而不是 PojoType . 我想原因可能是 Event 是成员类而不是全局可访问类。添加 static 修饰符应该能解决这个问题。

相关问题