我有一个正在运行并经过测试的kafka集群,正在尝试使用python脚本向代理发送消息。当我使用python3shell并调用producer方法时,这种方法是有效的,但是当我将这些相同的命令放入python文件并执行它时,脚本似乎挂起了。
我正在为消费者和生产者使用kafka python库。当我使用python3shell时,我可以看到消息出现在使用kafkaguitool2.0.4的主题中,我在python代码中尝试了各种循环和语句,但似乎没有什么能让它“运行”到完成。
>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>
这样就可以工作了,字节出现在代理主题数据中。
当我将上述代码放入python.py文件并使用python3执行时,代码就完成了,但没有数据发送到kafka代理。也没有显示错误。
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
1条答案
按热度按时间pb3skfrl1#
如你所见,它返回了一个未来。
kafka客户端将批处理记录,它们不会一次立即发送一条记录,要做到这一点,您需要等待或刷新生产者缓冲区,以便它在应用程序退出之前发送。换句话说,交互终端将生产者数据保存在内存中,在后台运行,而另一种方式则丢弃该数据
如文件所示,显示
或者干脆把
producer.flush()
,如果你不关心元数据或抓住未来。