我有以下问题:
我收到的消息由生产者产生的延迟(有时长达一分钟)。我对制作人没有控制权,我想确保我这边一切正常。
我在用 kafka-client
版本0.11.0.0。版本更改是可能的,但很乏味,所以我想尽量避免更改版本。
我的消费者配置如下所示:
group.id= unique-group-d
bootstrap.servers = # kafka broker with version 0.11.0.0
auto.offset.reset = latest
# avoid client id collision with Kafka
client.id= some-example-gateway-client
我正在调查这样的信息:
ConsumerRecords<String, String> tConsumerRecords = tempConsumer.poll(1000);
tempMessage.forEach(entry -> {
[...]
});
与客户端或组id 100%没有名称冲突。我订阅的主题只有一个分区。
我怀疑制作者是在重负下,这是一个独立的环境与零星的消息开始生产的生产者。
今天是星期五,所有人都已经走了,这也于事无补。
我错过什么了吗?我能把责任推给制片人吗?
1条答案
按热度按时间guicsvcw1#
从kafka v0.10开始,每条消息都包含一个元数据时间戳属性,该属性要么由生产者在消息创建时设置,要么由代理在消息插入时设置。
提供了一个接口
org.apache.kafka.streams.processor.TimestampExtractor
它有一个方法extract
从记录中提取时间戳的。基本上,您只需要比较生产者端和消费者端的时间戳,以确定消息是从生产者端还是从消费者端延迟的。
source:httpshttp://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/timestampextractor.html
https://kafka.apache.org/documentation/#topicconfigs