key\u index-changelog

zaqlnxep  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(277)

我在python中使用了kafka的faust包。我想收集值并将它们放入跳频表中。启动应用程序时,出现以下错误:
[错误]在群集元数据中找不到topic test.kafka-datatable-key\u index-changelog
如果我在服务器上手动创建主题,我的应用程序就会正常工作。但我希望在启动应用程序时自动创建主题。

def window_processor(key, events):
    print(f"key: {key}, values: {events}")

table = app.Table('DataTable', default=list, partitions=1, on_window_close=window_processor)
table = table.hopping(10, 5, expires=timedelta(seconds=10), key_index=True)

@app.timer(1.0)
async def my_job():
    data = await getValues()
    for value in data:
        await queue.send(key=value['name'], value=value['value'])

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题