flink-how-use-fasterxml/jackson-dataformats-text-将csv转换为pojo

0md85ypi  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(416)

我在我的类上接收到一个csv,我需要获取值来装箱一个pojo。我不必打开一个“file.csv”到一个目录中,逗号分隔的元素由flink传递到eventdeserializationschema,这个元素用于“event class”处理每个事件。
举个例子:
输入:“‘亚当’,‘史密斯’,66,….‘12:01:00.000’”->输出:pojo
为此,我使用:https://github.com/fasterxml/jackson-dataformats-text/tree/master/csv
这是我的事件类,应该做的把戏,实际上目前没有做任何事情。

import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

public class Event implements Serializable {

    CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    CsvSchema schema = CsvSchema.emptySchema().withHeader();

    CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
    ObjectMapper mapper = new CsvMapper();
    mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);

    return Pojo
}

这是我的pojo课程:

public class Pojo {

        public String firstName;
        public String lastName;
        private int age;
        public String time;

        public Pojo(String firstName, String lastName, int age, String time) {
            this.firstName = firstName;
            this.lastName = lastName;
            this.age = age;
            this.time =time;

        }

}

任何能让全班同学退回pojo的帮助都将不胜感激。
这是一个json示例:https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/clickeventdeserializationschema.java
ClickEven类https://github.com/apache/flink/blob/9dd04a25bd300a725486ff08560920f548f3b1d9/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/kafkaevent.java#l27

8cdiaqws

8cdiaqws1#

要使其工作,您需要为字段提供一个默认构造函数和getter/setters。我不明白你在这里要做什么 Event 为什么还有 Pojo ,但假设要将传入的字符串反序列化到 Event ,类似的操作应该可以: Event pojo类:

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}

此问题的eventdeserializationschema deserialize() 实施

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

相关问题