kafka简单生产者不发送消息也不给出错误

gv8xihay  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(417)

我正在写一个简单的生产者我只是想把原始数据发送到一个主题。出于某种原因,我需要指定serialiser,它将消息转换为json,然后转换为utf-8,然后发送一条json消息。。
此代码不起作用(没有错误,但主题中没有要使用的内容)

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = "my_new_topic5"

producer.send(topic, b'test message')

此代码有效

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                     value_serializer=lambda x:
                     dumps(x).encode('utf-8'))

for e in range(2):
    data = {'number': e}
    producer.send('numtest', value=data)
    sleep(5)
gmxoilav

gmxoilav1#

试着也打电话 producer.flush() 之后 send() 以及 producer.close() 在终止程序之前。下面应该可以做到这一点:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = "my_new_topic5"

producer.send(topic, b'test message')
producer.flush()
producer.close()

相关问题