从javaapi创建kafka主题

xn1cxnb4  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(481)

我正在尝试使用javaapi创建一个kafka主题,但是get leader不可用。
代码:

int partition = 0;
        ZkClient zkClient = null;
        try {
            String zookeeperHosts = "localhost:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);

            String topicName = "mdmTopic5";
            int noOfPartitions = 2;
            int noOfReplication = 1;
            Properties topicConfiguration = new Properties();
            AdminUtils.createTopic(zkClient, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }

错误:

[2017-10-19 12:14:42,263] WARN Error while fetching metadata with correlation id 1 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,370] WARN Error while fetching metadata with correlation id 3 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,479] WARN Error while fetching metadata with correlation id 4 : {mdmTopic5=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Kafka0.11.0.1是否支持管理。???请让我知道如何在此版本中创建主题。
提前谢谢。

k5hmc34c

k5hmc34c1#

一般 LEADER NOT AVAILABLE 指向网络问题,而不是代码问题。尝试: telnet host port 查看是否可以从计算机连接到所有必需的主机/端口。
然而,最新的方法是使用 BOOTSTRAP_SERVERS 创建主题时。
使用scala的主题创建代码的工作版本如下:
导入所需的 kafka-clients 使用sbt。

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1",
"org.apache.kafka" %% "kafka" % "1.0.0")

在scala中创建主题的代码:

import java.util.Arrays
import java.util.Properties

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}

class CreateKafkaTopic {
  def create(): Unit = {
    val config = new Properties()
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")

    val localKafkaAdmin = AdminClient.create(config)

    val partitions = 3
    val replication = 1.toShort
    val topic = new NewTopic("integration-02", partitions, replication)
    val topics = Arrays.asList(topic)

    val topicStatus = localKafkaAdmin.createTopics(topics).values()
    //topicStatus.values()
    println(topicStatus.keySet())
  }

}

希望有帮助。

hl0ma9xz

hl0ma9xz2#

由于kafka0.11有一个合适的管理api来创建(和删除)主题,我建议使用它,而不是直接连接到zookeeper。
请参见adminclient.createtopics():http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/adminclient.html#createtopics(java.util.collection)集合

h79rfbju

h79rfbju3#

adminutils api已被弃用。有一个新的api adminzkclient,我们可以用它来管理kafka服务器中的主题。有关详细信息,请参阅此链接https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

相关问题