我在我的类上接收到一个csv,我需要获取值来装箱一个pojo。我不必打开一个“file.csv”到一个目录中,逗号分隔的元素由flink传递到eventdeserializationschema,这个元素用于“event class”处理每个事件。
举个例子:
输入:“‘亚当’,‘史密斯’,66,….‘12:01:00.000’”->输出:pojo
为此,我使用:https://github.com/fasterxml/jackson-dataformats-text/tree/master/csv
这是我的事件类,应该做的把戏,实际上目前没有做任何事情。
import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
public class Event implements Serializable {
CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
CsvSchema schema = CsvSchema.emptySchema().withHeader();
CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
ObjectMapper mapper = new CsvMapper();
mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);
return Pojo
}
这是我的pojo课程:
public class Pojo {
public String firstName;
public String lastName;
private int age;
public String time;
public Pojo(String firstName, String lastName, int age, String time) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
this.time =time;
}
}
任何能让全班同学退回pojo的帮助都将不胜感激。
这是一个json示例:https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/clickeventdeserializationschema.java
ClickEven类https://github.com/apache/flink/blob/9dd04a25bd300a725486ff08560920f548f3b1d9/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/kafkaevent.java#l27
1条答案
按热度按时间8cdiaqws1#
要使其工作,您需要为字段提供一个默认构造函数和getter/setters。我不明白你在这里要做什么
Event
为什么还有Pojo
,但假设要将传入的字符串反序列化到Event
,类似的操作应该可以:Event
pojo类:此问题的eventdeserializationschema
deserialize()
实施