我需要设置Kafka生产者发送一批不是由味精味精,而是批量进口500毫克。我查过了https://github.com/dpkp/kafka-python/issues/479 试过了 producer.send_messages(topic, *message)
但它失败了:
> producer.send_messages('event_connector_mt', *load_entries)
E AttributeError: 'cimpl.Producer' object has no attribute 'send_messages'
我也试着通过了 producer.produce(topic, *message)
失败原因:
producer.produce('event_connector_mt', *load_entries)
E TypeError: function takes at most 8 arguments (501 given)
所以我深入研究发现,我必须在producer配置中将类型设置为async,batch.size设置为大于default,但是当我尝试如下配置时:
from confluent_kafka import Consumer, Producer
producer = Producer(**{'bootstrap.servers': KAFKA_BROKERS,
'queue.buffering.max.messages': 1000000,
'batch.num.messages': 500,
'batch.size': 19999,
'producer.type': 'async'
})
失败原因:
E KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "producer.type""}
batch.size也有同样的错误,你能告诉我在哪里以及如何设置异步和批处理大小,或者以任何其他方式将批量消息传递给kafka 0.9.3.1吗
2条答案
按热度按时间ny6fqffe1#
您正在混合名为“confluent kafka python”的融合python客户端
http://docs.confluent.io/3.2.1/clients/confluent-kafka-python/
与“KafkaPython”客户端
https://github.com/dpkp/kafka-python
这是两个具有不同api的不同客户机。
j0pj023g2#
默认情况下,所有生产者都是异步的。底层librdkafka库不支持producer.type和batch.size配置。
因此,请使用可用的配置batch.num.messages或message.max.bytes。