GET /consumers/testgroup/instances/my_consumer/records?max_bytes=300000
如果您收到超过100条消息,并且由于某种原因忽略了它们,那么如果启用了offset auto commit(它是在您创建使用者时定义的),您将不会在该使用者组上再次收到它们。你可能不想发生这种事! 如果您手动提交偏移量,那么如果您随后提交正确的偏移量以保证不会丢失任何消息,则可以忽略任何您想要的内容。您可以手动提交偏移量,如下所示:
POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
"offsets": [
{
"topic": "test",
"partition": 0,
"offset": <calculated offset ending where you stopped consuming for this partition>
},
{
"topic": "test",
"partition": 1,
"offset": <calculated offset ending where you stopped consuming for this partition>
}
]
}
3条答案
按热度按时间qqrboqgw1#
据我所知,这是目前不可能的。正如在另一个答案中所提到的,您可以指定以字节为单位的最大大小(尽管在某些情况下代理实际上可以忽略这一点),但是您不能指定所需的消息数。
但是,这样的特性可以很容易地在客户机代码中实现。您可以猜测大致的大小,查询restapi并查看您收到了多少条消息。如果小于100,则再次查询它以获取接下来的几条消息,直到达到100。
cbeh67ev2#
如果您正试图使用来自您的消费组的新批100条消息,则应将max_bytes设置为一个值,对于您的数据模型,该值将始终返回大约100条记录。你可以有一个更保守的逻辑(得到更少,然后得到更多,直到截止到100),或者你可以总是得到更多,然后忽略。在这两种情况下,您都应该对您的消费群体采用手动补偿管理。
如果您收到超过100条消息,并且由于某种原因忽略了它们,那么如果启用了offset auto commit(它是在您创建使用者时定义的),您将不会在该使用者组上再次收到它们。你可能不想发生这种事!
如果您手动提交偏移量,那么如果您随后提交正确的偏移量以保证不会丢失任何消息,则可以忽略任何您想要的内容。您可以手动提交偏移量,如下所示:
如果您正试图准确地获取主题的前100条记录,则需要在再次消费之前重置该主题和每个分区的消费组偏移量。您可以这样做(取自confluent):
pgky5nke3#
可以使用属性
ConsumerConfig.MAX_POLL_RECORDS_CONFIG
用于配置KafkaConsumer
. 请看医生