我使用的是apachekafka(无合流),有一个简单的kafka生产者,它从restapi提取数据,将数据发送到kafka并关闭。我在开发一个消费品的时候用这个做测试。在consumer中,我可以跟踪偏移量,但似乎无法在producer中设置自定义偏移量。在rest调用中,我需要跟踪一个日期,这样我就不会一直提取相同的数据。我是被迫自己存储“最后一个时间戳”还是遗漏了什么?
j2datikz1#
我认为在您的场景中,您对生产者端的“kafka”偏移量(当您向kafka写入时)不感兴趣,而对跟踪您从restapi提取的最新数据的“偏移量”感兴趣,所以您是对的,您必须自己做这件事。在kafka producer端,您可以知道分配给最新发送消息的偏移量(在recordmetadata内部),但它与从restapi提取数据时的最新时间戳没有关系。
1条答案
按热度按时间j2datikz1#
我认为在您的场景中,您对生产者端的“kafka”偏移量(当您向kafka写入时)不感兴趣,而对跟踪您从restapi提取的最新数据的“偏移量”感兴趣,所以您是对的,您必须自己做这件事。在kafka producer端,您可以知道分配给最新发送消息的偏移量(在recordmetadata内部),但它与从restapi提取数据时的最新时间戳没有关系。