kafka使用topiccommand创建主题

r6hnlfcb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(338)

我想用java创建一个主题。这是我的密码。

String s = "--topic pt8 --create --zookeeper 10.11.6.52:2181 --replica-assignment 7";
String[] args2 = s.split(" ");
TopicCommand.main(args2);

但有一个错误:
[zkclient-eventthread-14-10.11.6.52:2181]信息o.i.z.zkeventthread-启动zkclient事件线程。
[main]info o.i.z.zkclient-正在等待keeper状态syncconnected[main eventthread]info o.i.z.zkclient-zookeper状态已更改(syncconnected)
执行topic命令时出错:java.lang.exceptionininitializererror
[zkclient-eventthread-14-10.11.6.52:2181]信息o.i.z.zkeventthread-终止zkclient事件线程。 --list --zookeeper 10.11.6.52:2181 可以得到结果。 --delete --zookeeper 10.11.6.52:2181 --topic pt7 得到 Error while executing topic command : null .
我的pom.xml:

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>

使用管理员:

ZkClient  zkClient = new ZkClient("10.11.6.52:2181", 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
AdminUtils.createTopic(zkUtils, "pt8", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);

错误:
线程“main”kafka.admin.adminoperationexception中出现异常:java.lang.ExceptionInInitializeError

dsekswqp

dsekswqp1#

不要使用shell命令并尝试从java执行它,而是使用kafka管理客户端api,它应该与kafka 0.11+一起工作。
下面是一段代码片段:

void setUpKafkaTopics(KafkaAdminClient kafkaAdminClient) throws ExecutionException, InterruptedException {
  final Map<String, Integer> topics = new HashMap<>();
  topics.put(topicName, numOfPartitions);
  kafkaAdminClient.createTopics(topics, getTopicConfig(), replicationFactor);
}

Map<String, String> getTopicConfig() {
  Map<String, String> topicConfiguration = new HashMap<>();
  topicConfiguration.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
     Boolean.FALSE.toString());
  topicConfiguration.put(TopicConfig.CLEANUP_POLICY_CONFIG,
     TopicConfig.CLEANUP_POLICY_DELETE);
  topicConfiguration.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
     KAFKA_TOPIC_COMPRESSION_TYPE);
  topicConfiguration.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
     KAFKA_TOPIC_MIN_IN_SYNC_REPLICAS.toString()); 
  topicConfiguration.put(TopicConfig.RETENTION_MS_CONFIG,
     KAFKA_TOPIC_RETENTION_MS.toString()); 
  return topicConfiguration;
}

相关问题