我正在使用ApacheFlink,并尝试通过使用ApacheKafka协议来连接到AzureEventHub以接收来自它的消息。我设法连接到azureeventhub并接收消息,但我不能使用flink特性“setstartfromtimestamp(…)”,如这里所述(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-消费者开始位置配置)。当我试图从timestamp获取一些消息时,kafka说代理端的消息格式早于0.10.0。有人面临这个问题吗?apache kafka客户端版本为2.0.1 apache flink版本为1.7.2
更新:尝试使用azure事件中心快速启动示例(https://github.com/azure/azure-event-hubs-for-kafka/tree/master/quickstart/java)在消费者软件包中添加了获取带有时间戳的偏移量的代码,如果消息版本低于0.10.0 kafka版本,它将按预期返回null。
List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
System.out.println(offsetAndTimestamp);
1条答案
按热度按时间klh5stk11#
很抱歉我们错过了。kafka offsetsfortimes()现在在eh中受支持(以前不支持)。
请随时打开一个问题对我们的github在未来。https://github.com/azure/azure-event-hubs-for-kafka