我现在正在使用kafka python==2.0.1,
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
topic_name = "retention_test"
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=3, topic_configs={'log.retention.hours' : '100'})
response = admin.create_topics([topic])
print(response)
但无法创建主题,并引发以下错误-
raise error_type(
kafka.errors.InvalidConfigurationError: [Error 40] InvalidConfigurationError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='retention_test', num_partitions=1, replication_factor=3, replica_assignment=[], configs=[(config_key='log.retention.hours', config_value=100)])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='retention_test', error_code=40, error_message='Unknown topic config name: log.retention.hours')])
1条答案
按热度按时间roejwanj1#
克里希纳,
据我所知,您给“newtopics”的“newtopics”对象不允许topic属性“log.retention.hours”。log.retention.hours是代理的属性,在创建主题时用作默认值。更改主题的配置时,应指定主题级属性。
日志保留时间的主题级属性是retention.ms。
参考文献(1)https://kafka-python.readthedocs.io/en/master/apidoc/kafkaadminclient.html (2) http://kafka.apache.org/081/documentation.html#topic-配置(3)交叉引用:在运行时更改kafka保留期