使用kafkapythonapi向主题发送一堆消息。部分消息成功发送到主题,但并非所有消息都在程序终止前发送,并显示以下错误消息:
KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290>
Batch is already closed -- ignoring batch.done()
Error processing errback
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs
f(value)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce
self._complete_batch(batch, error, -1, None)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch
self._accumulator.deallocate(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate
self._incomplete.remove(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove
return self._incomplete.remove(batch)
在我的主题中,每次运行都会收到不同数量的消息。问题似乎是kafka producer.send调用无法在程序结束前完成发送。
根据kafka documentations producer.send是一种异步方法,这可能是根本原因-并非所有异步线程都在进程终止之前完成发送:
send()方法是异步的。调用时,它将记录添加到挂起记录发送的缓冲区中,并立即返回。这允许制作者将单个记录批处理在一起以提高效率。
有许多简单的解决方案(例如设置 batch.size
可能导致性能瓶颈。
如何在不影响性能的前提下解决这个问题?
1条答案
按热度按时间aiazj4mn1#
打个电话就行了
producer.flush()
在出口前。