无法使用kafka管理客户端api创建带有所需分区的kafka主题

lmvvr0a8  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(312)

我正在使用kafka管理客户端api来创建主题。正在创建主题,但是在默认情况下,主题是使用1个分区创建的。api不接受提供的可配置值。不知道我用的对不对。
注意:主题创建是在代理级别启用的。主题也在创建中,但是它是用分区1创建的。

NewTopic newTopic = new NewTopic(TOPIC_NAME, 10, (short) 1);
        CreateTopicsResult createTopicsResult = null;
        try {
            createTopicsResult = KafkaAdminClient.create(getAdminProperties()).createTopics(Collections.singletonList(newTopic));
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

不过,我可以使用kafka管理客户端api增加先前创建的主题的分区

efzxgjgh

efzxgjgh1#

我尝试使用以下代码复制此内容,但没有成功:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AdminApiDemo {

    private static final String BOOTSRAP_SERVER = "localhost:9092";
    private static final String TOPIC_NAME = "demoTopic";
    private static final int NUM_PARTITIONS = 3;
    private static final short NUM_REPLICAS = 1;

    private final AdminClient adminClient;

    private AdminApiDemo(Properties properties) {
        this.adminClient = KafkaAdminClient.create(properties);
    }

    public static void main(String[] args) {
        final Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSRAP_SERVER);

        new AdminApiDemo(properties).createTopic(TOPIC_NAME, NUM_PARTITIONS, NUM_REPLICAS);
    }

    private void createTopic(String topicName, int numPartitions, short numReplicas) {
        try {
            final NewTopic newTopic = new NewTopic(topicName, numPartitions, numReplicas);
            final CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
            result.values().get(topicName).get(5, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

kafka-topics --describe 显示以下内容:

root@kafka:/# kafka-topics --bootstrap-server localhost:9092 --describe --topic demoTopic
Topic:demoTopic PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: demoTopic    Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: demoTopic    Partition: 1    Leader: 1   Replicas: 1 Isr: 1
    Topic: demoTopic    Partition: 2    Leader: 1   Replicas: 1 Isr: 1

我想,好吧,如果这个主题在创作之前就存在了呢,但是我又得到了一个答案 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'demoTopic' already exists. 所以这也不是你的案子。
我知道这不是一个“真实”的答案,这解决了任何问题,对此我很抱歉。但我希望它能有所帮助。也许其他人可以用这个在他的环境中重现,并“看到”问题。

相关问题