自定义类
人
class Person
{
private Integer id;
private String name;
//getters and setters
}
Kafka-Flink连接器
TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));
现在如果我发送下面的json
{ "id" : 1, "name": Synd }
通过kafka控制台生产者,flink代码抛出空指针异常,但是如果我使用 SimpleStringSchema
与前面定义的customschema不同,正在打印流。
上面的设置有什么问题
2条答案
按热度按时间am46iovg1#
回答有相同问题的人
自定义序列化程序
使用架构
piah890a2#
这个
TypeInformationSerializationSchema
是一个反序列化/序列化模式,它使用flink的序列化堆栈,因此也是它的序列化程序。因此,当使用此SerializationSchema
flink希望数据已经用flink的序列化程序序列化了Person
类型。鉴于
Person
同学们,Flink很可能会使用PojoTypeSerializer
. 此序列化程序无法理解馈送json输入数据。如果您想使用json作为输入格式,那么您必须定义自己的格式
DeserializationSchema
可以将json解析为Person
.