我希望能够使用来自Kafka代理的消息或包含数据的本地文件,如何使用Faust实现这一点,而不用Faust编写一个非常相似的函数,只使用一个简单的for循环来迭代消息?
或者在这种情况下只避开浮士德更好?还在学习这一切,甚至不确定是否应该这样做。
@app.agent(input_topic)
async def myagent(messages):
async for item in stream:
result = do_something(item)
await output_topic.send(result)
我如何修改这个代码块,使其能够接受来自给定文件/列表的消息(取决于将要设置的配置变量)?或者将来自文件/列表的消息发送到输入主题?
1条答案
按热度按时间lbsnaicq1#
正如你所说,你不需要浮士德。(另外,它不能读取文件)。
使用
kafka-python
、aiokafka
等。像处理任何其他文件一样使用open('file')
,读取该文件,然后从中生成数据