flink kafka-自定义类数据总是空的

mctunoxg  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(509)

自定义类

class Person
{
  private Integer id;
  private String name; 
 //getters and setters
}

Kafka-Flink连接器

TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));

现在如果我发送下面的json

{ "id" : 1, "name": Synd }

通过kafka控制台生产者,flink代码抛出空指针异常,但是如果我使用 SimpleStringSchema 与前面定义的customschema不同,正在打印流。
上面的设置有什么问题

am46iovg

am46iovg1#

回答有相同问题的人
自定义序列化程序

class PersonSchema implements DeserializationSchema<Person>{

    private ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper;

    @Override
    public Person deserialize(byte[] bytes) throws IOException {
        return mapper.readValue( bytes, Person.class );
    }

    @Override
    public boolean isEndOfStream(Person person) {
        return false;
    }

    @Override
    public TypeInformation<Person> getProducedType() {
        return TypeInformation.of(new TypeHint<Person>(){});
    }
}

使用架构

DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", new PersonSchema() , getKafkaProperties()));
piah890a

piah890a2#

这个 TypeInformationSerializationSchema 是一个反序列化/序列化模式,它使用flink的序列化堆栈,因此也是它的序列化程序。因此,当使用此 SerializationSchema flink希望数据已经用flink的序列化程序序列化了 Person 类型。
鉴于 Person 同学们,Flink很可能会使用 PojoTypeSerializer . 此序列化程序无法理解馈送json输入数据。
如果您想使用json作为输入格式,那么您必须定义自己的格式 DeserializationSchema 可以将json解析为 Person .

相关问题