如何限制Kafka消费者的记录数

q9rjltbz  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(587)

我正在使用confluent kafka rest产品来消费某个主题的记录。我的意图是只消耗topic的前100条记录。我正在使用下面的restapi来获取记录

GET /consumers/testgroup/instances/my_consumer/records

如何做到这一点?你知道吗?

qqrboqgw

qqrboqgw1#

据我所知,这是目前不可能的。正如在另一个答案中所提到的,您可以指定以字节为单位的最大大小(尽管在某些情况下代理实际上可以忽略这一点),但是您不能指定所需的消息数。
但是,这样的特性可以很容易地在客户机代码中实现。您可以猜测大致的大小,查询restapi并查看您收到了多少条消息。如果小于100,则再次查询它以获取接下来的几条消息,直到达到100。

cbeh67ev

cbeh67ev2#

如果您正试图使用来自您的消费组的新批100条消息,则应将max_bytes设置为一个值,对于您的数据模型,该值将始终返回大约100条记录。你可以有一个更保守的逻辑(得到更少,然后得到更多,直到截止到100),或者你可以总是得到更多,然后忽略。在这两种情况下,您都应该对您的消费群体采用手动补偿管理。

GET /consumers/testgroup/instances/my_consumer/records?max_bytes=300000

如果您收到超过100条消息,并且由于某种原因忽略了它们,那么如果启用了offset auto commit(它是在您创建使用者时定义的),您将不会在该使用者组上再次收到它们。你可能不想发生这种事!
如果您手动提交偏移量,那么如果您随后提交正确的偏移量以保证不会丢失任何消息,则可以忽略任何您想要的内容。您可以手动提交偏移量,如下所示:

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": <calculated offset ending where you stopped consuming for this partition>
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": <calculated offset ending where you stopped consuming for this partition>
    }
  ]
}

如果您正试图准确地获取主题的前100条记录,则需要在再次消费之前重置该主题和每个分区的消费组偏移量。您可以这样做(取自confluent):

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 0
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 0
    }
  ]
}
pgky5nke

pgky5nke3#

可以使用属性 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 用于配置 KafkaConsumer . 请看医生

相关问题