我尝试在Flink中从Kafka Topic接收和访问JSON数据。工作原理是生成数据,将其发送到Kafka Topic,然后在Flink中以字符串形式接收。但我想以面向对象的方式访问数据(例如,从每条消息中提取特定属性)?
因此,我有一个Kafka Producer,它向Kafka Topic发送数据(例如,每隔1秒):
ObjectMapper test = new ObjectMapper();
ObjectNode jNode= test.createObjectNode();
jNode.put("LoPos", longPos)
.put("LaPos", latPos)
.put("Timestamp", timestamp.toString());
ProducerRecord<String, ObjectNode> rec = new ProducerRecord<String, ObjectNode>(topicName, jNode);
producer.send(rec);
所以JSON数据看起来像这样:
{"LoPos":10.5,"LaPos":2.5,"Timestamp":"2022-10-31 12:45:19.353"}
工作原理是,接收数据并将其打印为字符串:
DataStream<String> input =
env.fromSource(
KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setBounded(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setTopics(topicName)
.build(),
WatermarkStrategy.noWatermarks(),
"kafka-source");
将数据打印为字符串:
DataStream<String> parsed = input.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) {
System.out.println(value);
return "test";
我如何在Flink中接收数据并以面向对象的方式访问它(例如从每条消息中提取LoPos)?您会推荐哪种方法?我用JSONValueDeserializationSchema尝试过,但没有成功...
谢谢
1条答案
按热度按时间68bkxrlz1#
one of the recipes in the Immerok Apache Flink Cookbook中介绍了此主题。
在下面的例子中,我假设
Event
是一个Flink POJO。对于Flink 1.15或更早版本,您应该使用自定义反序列化程序:
反序列化程序可以是这样的:
我们在Flink 1.16中简化了这个过程,我们添加了一个合适的
JsonDeserializationSchema
,您可以用途:免责声明:我为Immerok工作。