我有一个项目,我消费Kafka的数据。显然,有几个字段将包含在标题中,我也需要为每条消息阅读这些字段。目前在Flink有办法做到这一点吗?谢谢!
lndjwyie1#
我也遇到了类似的问题,并在Flink1.8中找到了一种方法。以下是我写的:
FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer("topic", new JSONKeyValueDeserializationSchema(true){ ObjectMapper mapper = new ObjectMapper(); @Override public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { ObjectNode result = super.deserialize(record); if (record.headers() != null) { Map<String, JsonNode> headers = StreamSupport.stream(record.headers().spliterator(), false).collect(Collectors.toMap(h -> h.key(), h -> (JsonNode)this.mapper.convertValue(new String(h.value()), JsonNode.class))); result.set("headers", mapper.convertValue(headers, JsonNode.class)); } return result; } }, kafkaProps);
希望这有帮助!
w41d8nur2#
@实际上,Kafka补充道 Header 自版本以来的概念 0.11.0.0 . https://issues.apache.org/jira/browse/kafka-4208问题是 flink-connector-kafka-0.11_2.11 随附的 flink-1.4.0 ,应该支持 kafka-0.11.0.0 只是在阅读Kafka的文章时忽略了邮件头。所以不幸的是,除非您在flin中实现自己的kafkaconsumer,否则无法读取这些头文件。我也对Kafka消息头中的readin感兴趣,希望flink团队能对此提供支持。
Header
0.11.0.0
flink-connector-kafka-0.11_2.11
flink-1.4.0
kafka-0.11.0.0
2条答案
按热度按时间lndjwyie1#
我也遇到了类似的问题,并在Flink1.8中找到了一种方法。以下是我写的:
希望这有帮助!
w41d8nur2#
@实际上,Kafka补充道
Header
自版本以来的概念0.11.0.0
. https://issues.apache.org/jira/browse/kafka-4208问题是
flink-connector-kafka-0.11_2.11
随附的flink-1.4.0
,应该支持kafka-0.11.0.0
只是在阅读Kafka的文章时忽略了邮件头。所以不幸的是,除非您在flin中实现自己的kafkaconsumer,否则无法读取这些头文件。
我也对Kafka消息头中的readin感兴趣,希望flink团队能对此提供支持。