java—在ApacheFlink中从kafka头读取数据的方法

wwwo4jvm  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(479)

我有一个项目,我消费Kafka的数据。显然,有几个字段将包含在标题中,我也需要为每条消息阅读这些字段。目前在Flink有办法做到这一点吗?
谢谢!

lndjwyie

lndjwyie1#

我也遇到了类似的问题,并在Flink1.8中找到了一种方法。以下是我写的:

FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer("topic", new JSONKeyValueDeserializationSchema(true){
    ObjectMapper mapper = new ObjectMapper();
    @Override
    public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        ObjectNode result = super.deserialize(record);
        if (record.headers() != null) {
            Map<String, JsonNode> headers = StreamSupport.stream(record.headers().spliterator(), false).collect(Collectors.toMap(h -> h.key(), h -> (JsonNode)this.mapper.convertValue(new String(h.value()), JsonNode.class)));
            result.set("headers", mapper.convertValue(headers, JsonNode.class));
        }

        return result;
    }
}, kafkaProps);

希望这有帮助!

w41d8nur

w41d8nur2#

@实际上,Kafka补充道 Header 自版本以来的概念 0.11.0.0 . https://issues.apache.org/jira/browse/kafka-4208
问题是 flink-connector-kafka-0.11_2.11 随附的 flink-1.4.0 ,应该支持 kafka-0.11.0.0 只是在阅读Kafka的文章时忽略了邮件头。
所以不幸的是,除非您在flin中实现自己的kafkaconsumer,否则无法读取这些头文件。
我也对Kafka消息头中的readin感兴趣,希望flink团队能对此提供支持。

相关问题