调试faust流处理-从主题开始重新启动应用程序

e5njpo68  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(451)

我正在调试一个简单的应用程序:

import faust

app = faust.App('app08')

# want to start from the beginning of the

# topic every time the application restarts

@app.agent(topic) 
async def process(stream):
    async for event in stream:
        print(event)

并希望在重新启动此应用程序时让代理从最早的偏移量读取。现在,它很聪明,知道最后读取的消息的位置,并在重新启动时从该位置开始。尽管在文档中搜索了一段时间,我还是找不到一个如何做到这一点的例子。我知道的唯一方法是更改应用程序名称,例如: app08app09 .

mzaanser

mzaanser1#

请记住,偏移量是由kafka服务器使用与您的faust应用程序同名的消费者组控制的,我一直在使用 kafaka-consumer-groups cli(kafka安装的一部分)来执行此操作。

kafka-consumer-groups --bootstrap-server kafka_bootstrap --reset-offsets --to-earliest --group faust_appname --execute --all-topics

您也可以替换 --to-earliest--to-datetime 并以以下格式提供时间戳 2020-09-20T00:00:00.00 如果你有一个相对较新版本的Kafka运行。
如果您想自动化这一点,我很高兴有pythonapi来自动控制消费群体。

相关问题