我正在调试一个简单的应用程序:
import faust
app = faust.App('app08')
# want to start from the beginning of the
# topic every time the application restarts
@app.agent(topic)
async def process(stream):
async for event in stream:
print(event)
并希望在重新启动此应用程序时让代理从最早的偏移量读取。现在,它很聪明,知道最后读取的消息的位置,并在重新启动时从该位置开始。尽管在文档中搜索了一段时间,我还是找不到一个如何做到这一点的例子。我知道的唯一方法是更改应用程序名称,例如: app08
至 app09
.
1条答案
按热度按时间mzaanser1#
请记住,偏移量是由kafka服务器使用与您的faust应用程序同名的消费者组控制的,我一直在使用
kafaka-consumer-groups
cli(kafka安装的一部分)来执行此操作。您也可以替换
--to-earliest
与--to-datetime
并以以下格式提供时间戳2020-09-20T00:00:00.00
如果你有一个相对较新版本的Kafka运行。如果您想自动化这一点,我很高兴有pythonapi来自动控制消费群体。