python生产者可以通过shell发送,但不能通过.py

qlzsbp2j  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(437)

我有一个正在运行并经过测试的kafka集群,正在尝试使用python脚本向代理发送消息。当我使用python3shell并调用producer方法时,这种方法是有效的,但是当我将这些相同的命令放入python文件并执行它时,脚本似乎挂起了。
我正在为消费者和生产者使用kafka python库。当我使用python3shell时,我可以看到消息出现在使用kafkaguitool2.0.4的主题中,我在python代码中尝试了各种循环和语句,但似乎没有什么能让它“运行”到完成。

>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>

这样就可以工作了,字节出现在代理主题数据中。
当我将上述代码放入python.py文件并使用python3执行时,代码就完成了,但没有数据发送到kafka代理。也没有显示错误。

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
pb3skfrl

pb3skfrl1#

如你所见,它返回了一个未来。
kafka客户端将批处理记录,它们不会一次立即发送一条记录,要做到这一点,您需要等待或刷新生产者缓冲区,以便它在应用程序退出之前发送。换句话说,交互终端将生产者数据保存在内存中,在后台运行,而另一种方式则丢弃该数据
如文件所示,显示

future = producer.send(...)

try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

或者干脆把 producer.flush() ,如果你不关心元数据或抓住未来。

相关问题