kafka 1.0.0管理客户端无法使用eofexception创建主题

ldioqlga  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(335)

我希望使用1.0.0kafka管理客户机以编程方式在代理上创建一个主题。我正好在用scala。我尝试使用以下代码在kafka代理上创建一个主题,或者只是列出可用的主题

import org.apache.kafka.clients.admin.{AdminClient, ListTopicsOptions, NewTopic}
import scala.collection.JavaConverters._

val zkServer = "localhost:2181"
val topic = "test1"

val zookeeperConnect = zkServer
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 8 * 1000

val partitions = 1
val replication:Short = 1
val topicConfig = new Properties() // add per-topic configurations settings here

import org.apache.kafka.clients.admin.AdminClientConfig
val config = new Properties
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, zkServer)
val admin = AdminClient.create(config)

val existing = admin.listTopics(new ListTopicsOptions().timeoutMs(500).listInternal(true))
val nms = existing.names()
nms.get().asScala.foreach(nm => println(nm)) // nms.get() fails

val newTopic = new NewTopic(topic, partitions, replication)
newTopic.configs(Map[String,String]().asJava)
val ret = admin.createTopics(List(newTopic).asJavaCollection)
ret.all().get() // Also fails
admin.close()

使用任一命令,zookeeper(3.4.10)端抛出一个eofexception并关闭连接。在调试zookeeper端本身时,似乎无法反序列化管理客户端正在发送的消息(尝试读取的字节已用完)
有没有人能够让1.0.0Kafka管理客户端用于创建或列出主题?

gwbalxhn

gwbalxhn1#

adminclient直接连接到kafka,不需要访问zookeeper。
你需要设置 AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG 指向你的Kafka经纪人(例如 localhost:9092 )而不是Zookeeper。

相关问题