(说到Kafka和Kafka,我完全是初学者,如果这是个愚蠢的问题,我很抱歉)我有一个问题,我们有一个主题,保留48小时的数据(数百万条记录);我想知道从这个主题中获得最后“20分钟”的数据,然后再传输新消息的最佳方法。本主题中的每条消息都是json,并且有一个以unix毫秒为单位的时间戳(从epoch(utc)开始)。性能显然是一个问题
r9f1avp51#
java客户机中有一个工具可以通过时间戳来寻找偏移量。Kafka伊对此有公关,但似乎没有得到证实和合并。我想是吧 node-rdkafka 是的。下面是一个例子(参考)
node-rdkafka
consumer.offsetsForTimes( [ {topic: 'hi', partition: 0, offset: Date.now() - (20*60*1000) } ], timeout, console.log );
当你得到偏移量时,你可以找到它们并开始阅读。
1条答案
按热度按时间r9f1avp51#
java客户机中有一个工具可以通过时间戳来寻找偏移量。Kafka伊对此有公关,但似乎没有得到证实和合并。
我想是吧
node-rdkafka
是的。下面是一个例子(参考)当你得到偏移量时,你可以找到它们并开始阅读。