在将retention.ms更改为100ms后,我想检查我的主题是否为空。我尝试这样使用我的主题:
consumer_kafka = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='latest',
consumer_timeout_ms=10000)
for message in consumer_kafka:
if message is None:
return True
else:
return False
如果返回true(主题已清除),那么我将发送我的json文件。但此方法不返回任何值。有人能帮我吗?。
暂无答案!
目前还没有任何答案,快来回答吧!