apache-kafka 获取JSON作为apache flink中的输入

mwkjh3gx  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(148)

我尝试在Flink中从Kafka Topic接收和访问JSON数据。工作原理是生成数据,将其发送到Kafka Topic,然后在Flink中以字符串形式接收。但我想以面向对象的方式访问数据(例如,从每条消息中提取特定属性)?
因此,我有一个Kafka Producer,它向Kafka Topic发送数据(例如,每隔1秒):

ObjectMapper test = new ObjectMapper();
ObjectNode jNode= test.createObjectNode();
jNode.put("LoPos", longPos)
    .put("LaPos", latPos)
    .put("Timestamp", timestamp.toString());

ProducerRecord<String, ObjectNode> rec = new ProducerRecord<String, ObjectNode>(topicName, jNode);
        producer.send(rec);

所以JSON数据看起来像这样:

{"LoPos":10.5,"LaPos":2.5,"Timestamp":"2022-10-31 12:45:19.353"}

工作原理是,接收数据并将其打印为字符串:

DataStream<String> input =
        env.fromSource(
                KafkaSource.<String>builder()
                        .setBootstrapServers("localhost:9092")
                        .setBounded(OffsetsInitializer.latest())
                        .setValueOnlyDeserializer(new SimpleStringSchema())
                        .setTopics(topicName)
                        .build(),
                WatermarkStrategy.noWatermarks(),
                "kafka-source");

将数据打印为字符串:

DataStream<String> parsed = input.map(new MapFunction<String, String>() {
          private static final long serialVersionUID = -6867736771747690202L;
            @Override
            public String map(String value) {
                System.out.println(value);
                return "test";

我如何在Flink中接收数据并以面向对象的方式访问它(例如从每条消息中提取LoPos)?您会推荐哪种方法?我用JSONValueDeserializationSchema尝试过,但没有成功...
谢谢

68bkxrlz

68bkxrlz1#

one of the recipes in the Immerok Apache Flink Cookbook中介绍了此主题。
在下面的例子中,我假设Event是一个Flink POJO。
对于Flink 1.15或更早版本,您应该使用自定义反序列化程序:

KafkaSource<Event> source =
    KafkaSource.<Event>builder()
        .setBootstrapServers("localhost:9092")
        .setTopics(TOPIC)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new EventDeserializationSchema())
        .build();

反序列化程序可以是这样的:

public class EventDeserializationSchema extends AbstractDeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    private transient ObjectMapper objectMapper;

    /**
     * For performance reasons it's better to create on ObjectMapper in this open method rather than
     * creating a new ObjectMapper for every record.
     */
    @Override
    public void open(InitializationContext context) {
        // JavaTimeModule is needed for Java 8 data time (Instant) support
        objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
    }

    /**
     * If our deserialize method needed access to the information in the Kafka headers of a
     * KafkaConsumerRecord, we would have implemented a KafkaRecordDeserializationSchema instead of
     * extending AbstractDeserializationSchema.
     */
    @Override
    public Event deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, Event.class);
    }
}

我们在Flink 1.16中简化了这个过程,我们添加了一个合适的JsonDeserializationSchema,您可以用途:

KafkaSource<Event> source =
    KafkaSource.<Event>builder()
        .setBootstrapServers("localhost:9092")
        .setTopics(TOPIC)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new JsonDeserializationSchema<>(Event.class))
        .build();

免责声明:我为Immerok工作。

相关问题