keyerror:kafka.producer.record\u accumulator.recordbatch

wljmcqd8  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(734)

使用kafkapythonapi向主题发送一堆消息。部分消息成功发送到主题,但并非所有消息都在程序终止前发送,并显示以下错误消息:

KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290>
Batch is already closed -- ignoring batch.done()
Error processing errback
Traceback (most recent call last):
  File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs
    f(value)
  File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce
    self._complete_batch(batch, error, -1, None)
  File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch
    self._accumulator.deallocate(batch)
  File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate
    self._incomplete.remove(batch)
  File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove
    return self._incomplete.remove(batch)

在我的主题中,每次运行都会收到不同数量的消息。问题似乎是kafka producer.send调用无法在程序结束前完成发送。
根据kafka documentations producer.send是一种异步方法,这可能是根本原因-并非所有异步线程都在进程终止之前完成发送:
send()方法是异步的。调用时,它将记录添加到挂起记录发送的缓冲区中,并立即返回。这允许制作者将单个记录批处理在一起以提高效率。
有许多简单的解决方案(例如设置 batch.size 可能导致性能瓶颈。
如何在不影响性能的前提下解决这个问题?

aiazj4mn

aiazj4mn1#

打个电话就行了 producer.flush() 在出口前。

相关问题