如何使用aws kinesis发送和使用pojo

xdnvmnnf  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(380)

我要吃掉和Flink从凯西斯来的波乔。
对于如何正确发送和反序列化消息,有什么标准吗?
谢谢

nkkqxpd9

nkkqxpd91#

我决定:

  1. DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
  2. "my-stream",
  3. new POJODeserializationSchema(),
  4. kinesisConsumerConfig));

  1. public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
  2. private ObjectMapper mapper;
  3. @Override
  4. public SamplePojo deserialize(byte[] message) throws IOException {
  5. if (mapper == null) {
  6. mapper = new ObjectMapper();
  7. }
  8. SamplePojo retVal = mapper.readValue(message, SamplePojo.class);
  9. return retVal;
  10. }
  11. @Override
  12. public boolean isEndOfStream(SamplePojo nextElement) {
  13. return false;
  14. }
  15. }
展开查看全部

相关问题