如何使用aws kinesis发送和使用pojo

xdnvmnnf  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(370)

我要吃掉和Flink从凯西斯来的波乔。
对于如何正确发送和反序列化消息,有什么标准吗?
谢谢

nkkqxpd9

nkkqxpd91#

我决定:

DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
        "my-stream",
        new POJODeserializationSchema(),
        kinesisConsumerConfig));

public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
    private ObjectMapper mapper;

    @Override
    public SamplePojo deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }

        SamplePojo retVal = mapper.readValue(message, SamplePojo.class);

        return retVal;
    }

    @Override
    public boolean isEndOfStream(SamplePojo nextElement) {
        return false;
    }
}

相关问题