simpleconsumer在kafka中已被弃用,org.apache.kafka.clients.consumer.kafkaconsumer是替代品。但是,它没有send(…)函数。如何使用新的kafkaconsumer重写下面的代码?
import scala.concurrent.duration._
import kafka.api.TopicMetadataRequest
import kafka.consumer.SimpleConsumer
....
val consumer = new SimpleConsumer(
host = "127.0.0.1",
port = 9092,
soTimeout = 2.seconds.toMillis.toInt,
bufferSize = 1024,
clientId = "health-check")
// this will fail if Kafka is unavailable
consumer.send(new TopicMetadataRequest(Nil, 1))
2条答案
按热度按时间yyyllmsg1#
您可以使用
.partitionsFor
以及.listTopics
64jmpszr2#
方法没有直接的替代品,这取决于你想做什么。如果您需要所有分区信息,那么在新的api中有一个用于该使用者的方法.partitionfor(topic)