我知道使用kafka consumers的api可以得到一个对应于特定时间戳的偏移量(getoffsetsbytimes())。我们如何获得偏移量并根据ksql或kstream中的时间戳从一个点开始重放流?
xienkqul1#
ksql中还不支持这一点。您可以选择使用 predicate 来筛选不需要的记录: SELECT * FROM SOURCE_STREAM WHERE ROWTIME > x 但根据偏移量相对于日志中数据量的位置,这种方法可能效率低下。
SELECT * FROM SOURCE_STREAM WHERE ROWTIME > x
1条答案
按热度按时间xienkqul1#
ksql中还不支持这一点。您可以选择使用 predicate 来筛选不需要的记录:
SELECT * FROM SOURCE_STREAM WHERE ROWTIME > x
但根据偏移量相对于日志中数据量的位置,这种方法可能效率低下。