Go语言 如何将复制因子添加到主题?

zpjtge22  于 2023-04-27  发布在  Go
关注(0)|答案(1)|浏览(120)
conf := &kafka.ConfigMap{
        "bootstrap.servers": "127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094",
        "min.insync.replicas":3,
    }

producer, err := kafka.NewProducer(conf)
if err != nil {
       log.Println("Error initializing producer: ", err)
}

我使用www.example.com包以这种方式初始化了Kafka producerconfluent.io。

producerErr := p.Producer.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{
                Topic: &topic,
                Partition: kafka.PartitionAny,
            },
            Key:            []byte("Message"),
            Value:          msg,
        }, nil)

这是一个主题的生成,但是,如何为主题设置复制因子。或者建议一些方法设置代理和复制因子的数量。

9jyewag0

9jyewag01#

在使用生产者之前,你需要一个现有的主题。生产者自己不创建主题,因此不能设置复制因子(或分区,或其他主题配置)。
您需要使用AdminClient来设置new主题的副本(和其他设置

// Create a new AdminClient.
    // AdminClient can also be instantiated using an existing
    // Producer or Consumer instance, see NewAdminClientFromProducer and
    // NewAdminClientFromConsumer.
    a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
...
    results, err := a.CreateTopics(
        ctx,
        // Multiple topics can be created simultaneously
        // by providing more TopicSpecification structs here.
        []kafka.TopicSpecification{{
            Topic:             topic,
            NumPartitions:     numParts,
            ReplicationFactor: replicationFactor}},
...

https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/admin_create_topic/admin_create_topic.go
将副本“添加”到 * 现有 * 主题的唯一方法是使用Kafka CLI命令中包含的kafka-reassign-partitions.sh

相关问题