conf := &kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094",
"min.insync.replicas":3,
}
producer, err := kafka.NewProducer(conf)
if err != nil {
log.Println("Error initializing producer: ", err)
}
我使用www.example.com包以这种方式初始化了Kafka producerconfluent.io。
producerErr := p.Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte("Message"),
Value: msg,
}, nil)
这是一个主题的生成,但是,如何为主题设置复制因子。或者建议一些方法设置代理和复制因子的数量。
1条答案
按热度按时间9jyewag01#
在使用生产者之前,你需要一个现有的主题。生产者自己不创建主题,因此不能设置复制因子(或分区,或其他主题配置)。
您需要使用
AdminClient
来设置new主题的副本(和其他设置https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/admin_create_topic/admin_create_topic.go
将副本“添加”到 * 现有 * 主题的唯一方法是使用Kafka CLI命令中包含的
kafka-reassign-partitions.sh