合流kafka python库为批量消息配置生产者

kmpatx3s  于 2021-05-29  发布在  Kafka
关注(0)|答案(2)|浏览(668)

我需要设置Kafka生产者发送一批不是由味精味精,而是批量进口500毫克。我查过了https://github.com/dpkp/kafka-python/issues/479 试过了 producer.send_messages(topic, *message) 但它失败了:

>       producer.send_messages('event_connector_mt', *load_entries)
E       AttributeError: 'cimpl.Producer' object has no attribute 'send_messages'

我也试着通过了 producer.produce(topic, *message) 失败原因:

producer.produce('event_connector_mt', *load_entries)
E       TypeError: function takes at most 8 arguments (501 given)

所以我深入研究发现,我必须在producer配置中将类型设置为async,batch.size设置为大于default,但是当我尝试如下配置时:

from confluent_kafka import Consumer, Producer       

producer = Producer(**{'bootstrap.servers': KAFKA_BROKERS,
                       'queue.buffering.max.messages': 1000000,
                       'batch.num.messages': 500,
                       'batch.size': 19999,
                       'producer.type': 'async'
                       })

失败原因:

E       KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "producer.type""}

batch.size也有同样的错误,你能告诉我在哪里以及如何设置异步和批处理大小,或者以任何其他方式将批量消息传递给kafka 0.9.3.1吗

ny6fqffe

ny6fqffe1#

您正在混合名为“confluent kafka python”的融合python客户端
http://docs.confluent.io/3.2.1/clients/confluent-kafka-python/
与“KafkaPython”客户端
https://github.com/dpkp/kafka-python
这是两个具有不同api的不同客户机。

j0pj023g

j0pj023g2#

默认情况下,所有生产者都是异步的。底层librdkafka库不支持producer.type和batch.size配置。
因此,请使用可用的配置batch.num.messages或message.max.bytes。

相关问题