kafka消费延迟

w9apscun  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(413)

我有以下问题:
我收到的消息由生产者产生的延迟(有时长达一分钟)。我对制作人没有控制权,我想确保我这边一切正常。
我在用 kafka-client 版本0.11.0.0。版本更改是可能的,但很乏味,所以我想尽量避免更改版本。
我的消费者配置如下所示:

group.id= unique-group-d
bootstrap.servers = # kafka broker with version 0.11.0.0
auto.offset.reset = latest

# avoid client id collision with Kafka

client.id= some-example-gateway-client

我正在调查这样的信息:

ConsumerRecords<String, String> tConsumerRecords = tempConsumer.poll(1000);
tempMessage.forEach(entry -> {
    [...]
});

与客户端或组id 100%没有名称冲突。我订阅的主题只有一个分区。
我怀疑制作者是在重负下,这是一个独立的环境与零星的消息开始生产的生产者。
今天是星期五,所有人都已经走了,这也于事无补。
我错过什么了吗?我能把责任推给制片人吗?

guicsvcw

guicsvcw1#

从kafka v0.10开始,每条消息都包含一个元数据时间戳属性,该属性要么由生产者在消息创建时设置,要么由代理在消息插入时设置。
提供了一个接口 org.apache.kafka.streams.processor.TimestampExtractor 它有一个方法 extract 从记录中提取时间戳的。
基本上,您只需要比较生产者端和消费者端的时间戳,以确定消息是从生产者端还是从消费者端延迟的。
source:httpshttp://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/timestampextractor.html
https://kafka.apache.org/documentation/#topicconfigs

相关问题