我要吃掉和Flink从凯西斯来的波乔。对于如何正确发送和反序列化消息,有什么标准吗?谢谢
nkkqxpd91#
我决定:
DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>( "my-stream", new POJODeserializationSchema(), kinesisConsumerConfig));
DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
"my-stream",
new POJODeserializationSchema(),
kinesisConsumerConfig));
和
public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> { private ObjectMapper mapper; @Override public SamplePojo deserialize(byte[] message) throws IOException { if (mapper == null) { mapper = new ObjectMapper(); } SamplePojo retVal = mapper.readValue(message, SamplePojo.class); return retVal; } @Override public boolean isEndOfStream(SamplePojo nextElement) { return false; }}
public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
private ObjectMapper mapper;
@Override
public SamplePojo deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
SamplePojo retVal = mapper.readValue(message, SamplePojo.class);
return retVal;
public boolean isEndOfStream(SamplePojo nextElement) {
return false;
1条答案
按热度按时间nkkqxpd91#
我决定:
和