我正在使用scala、spark和kafka。我有两个问题。
1.如何确认kafka broker(服务器)中存在该主题?
2.如何确认kafka服务器(引导服务器)是否正在运行?
object kafkaProducer extends App {
def sendMessages(): Unit = {
//define topic
val topic = "spark-topic" // how can i confirm this topic is exist in kafka server ?
//define producer properties
val props = new java.util.Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "KafkaProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
//create producer instance
val kafkaProducer = new KafkaProducer[String, JsonNode](props)
//create object mapper
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
//mapper Json object to string
def toJson(value: Any): String = {
mapper.writeValueAsString(value)
}
//send producer message
val jsonstring =
s"""{
| "id": "0001",
| "name": "Peter"
|}
""".stripMargin
val jsonNode: JsonNode = mapper.readTree(jsonstring)
val rec = new ProducerRecord[String, JsonNode](topic, jsonNode)
kafkaProducer.send(rec)
//println(rec)
}
}
1条答案
按热度按时间zi8p0yeb1#
1) 检查主题是否存在的推荐方法是使用adminclient api。
你可以用
listTopics()
或者describeTopics()
.2) 假设您对集群没有任何访问权限(检查度量或活动性探测),检查集群是否正在运行的唯一方法是尝试连接/使用它。
使用adminclient,您可以使用
describeCluster()
,例如,尝试检索群集的状态。