我有一些应用程序逻辑,用pykafka用python编写,它是这样做的:
consumer = self.get_balanced_consumer()
while True:
msg = consumer.consume(block=False)
if not msg:
break
self._work(msg)
consumer.commit_offsets()
使用kafka connect api的等效逻辑是什么?
暂无答案!
目前还没有任何答案,快来回答吧!