我想让两个faust代理监听同一个kafka主题,但是每个代理在处理事件之前使用自己的过滤器,并且它们的事件集不相交。
在文档中,我们有一个示例:https://faust.readthedocs.io/en/latest/userguide/streams.html#id4
如果两个代理使用订阅到同一主题的流:
topic = app.topic('orders')
@app.agent(topic)
async def processA(stream):
async for value in stream:
print(f'A: {value}')
@app.agent(topic)
async def processB(stream):
async for value in stream:
print(f'B: {value}')
指挥者将把在“订单”主题上收到的每条消息转发给两个代理,每当它进入代理流时就增加引用计数。
当事件被确认时,引用计数减少,当它达到零时,使用者将认为偏移量“已完成”,并可以提交它。
以下为过滤器https://faust.readthedocs.io/en/latest/userguide/streams.html#id13:
@app.agent() async def process(stream):
async for value in stream.filter(lambda: v > 1000).group_by(...):
...
我使用了一些复杂的过滤器,但结果是将流分成两部分,用于两个逻辑完全不同的代理(我不使用分组方式)
如果两个特工一起工作,一切都会好的。但是,如果我停止它们并重新启动,它们将从头开始处理流。因为每个事件都没有被一个特工确认。如果我确认每个代理中的所有事件,当其中一个代理不启动时,第二个代理将清除主题(如果一个被压碎并重新启动,售票员将看到三个订户,因为它正在等待20分钟(压碎的代理等待响应)。
我只想把事件分为两部分。在这种情况下,如何进行适当的同步?
暂无答案!
目前还没有任何答案,快来回答吧!