我们有一个Kafka消费者将阅读消息,这样做的东西,并再次发布到Kafka主题使用下面的脚本
生产者配置:
{
"bootstrap.servers": "localhost:9092"
}
我没有像这样配置任何其他配置 queue.buffering.max.messages
queue.buffering.max.ms batch.num.messages
我假设这些都将是配置中的默认值
queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
我的理解是:当内部队列到达queue.buffering.max.ms或batch.num.messages时,消息将在单独的线程中发布到kafka。在我的配置队列中,queue.buffering.max.ms是0,所以当我调用product()时,每个消息都将被发布。如果我错了,请纠正我。
我的制片人片段:
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.flush()
从这篇文章中我了解到,在每条消息之后使用flush,producer将成为sync producer。如果我使用上面的脚本,它需要45毫秒才能发布到Kafka
如果我把上面的片段改成
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.poll(0)
是否有任何性能将得到改进?你能澄清我的理解吗。
谢谢
1条答案
按热度按时间cl25kdpy1#
两者的区别
flush()
以及poll()
在客户文件中解释。为了
flush()
,它指出:等待生产者队列中的所有消息被传递。这是一个方便的方法,它调用poll(),直到len()为零或经过可选超时。
为了
poll()
:轮询生产者的事件并调用相应的回调(如果已注册)。
打电话
poll()
就在send()
不使生产者同步,因为刚刚发送的消息不太可能已经到达代理并且传递报告已经发送回客户端。相反
flush()
将阻止,直到先前发送的消息已被传递(或出错),有效地使生产者同步。