获取主题和分区偏移量

x6h2sr28  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(420)

kafka.network.blockingchannel.send(blockingchannel)的线程“main”java.nio.channels.closedchannelexception中出现异常。scala:100)在kafka.consumer.simpleconsumer.liftedtree1$1(simpleconsumer。scala:78)在kafka.consumer.simpleconsumer.kafka$consumer$simpleconsumer$$sendrequest(simpleconsumer。scala:68)在kafka.consumer.simpleconsumer.send(simpleconsumer。scala:91)在kafka.javaapi.consumer.simpleconsumer.send(simpleconsumer。scala:68)在cmb.sparkstream.kafka.kafkaoffsettool.gettopicofsets(kafkaoffsettool。java:47)在cmb.sparkstream.logclassify.main(logclassify。java:95)位于sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl。java:57)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:606)在org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit)。scala:729)在org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit。scala:185)在org.apache.spark.deploy.sparksubmit$.submit(sparksubmit。scala:210)位于org.apache.spark.deploy.sparksubmit$.main(sparksubmit.com)。scala:124)位于org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala)
我的代码是:

public static Map<TopicAndPartition, Long> getTopicOffsets(String zkServers, String topic) {

  Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>();
  for (String zkserver : zkServers.split(",")) {
   SimpleConsumer simpleConsumer = new SimpleConsumer(zkserver.split(":")[0],
     Integer.valueOf(zkserver.split(":")[1]), Consts.getKafkaConfigBean().getDuration(), 1024,
     "consumser");
   TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic));

   TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);

   for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
    for (PartitionMetadata part : metadata.partitionsMetadata()) {
     Broker leader = part.leader();
     if (leader != null) {
      TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId());

      PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(
        kafka.api.OffsetRequest.LatestTime(), 10000);
      OffsetRequest offsetRequest = new OffsetRequest(
        ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo),
        kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
      OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
      if (!offsetResponse.hasError()) {
       long[] offsets = offsetResponse.offsets(topic, part.partitionId());
       retVals.put(topicAndPartition, offsets[0]);
      }
     }

    }

   }
   simpleConsumer.close();
  }
  return retVals;
 }
ia2d9nvy

ia2d9nvy1#

我想你可能把事情搞得太复杂了。使用org.apache.kafka.clients.consumer.kafkaconsumer(此处为consumer)并执行类似于

val partitions = consumer.partitionsFor(topic).map[new TopicPartition(topic,it.partition)]
    consumer.assign(partitions)
    consumer.seekToEnd(partitions)
    val offsets = partitions.map[ it -> consumer.position(it)]
    println(offsets)

你会得到这样的结果
[主题名称-8->1917258,主题名称-2->1876810,主题名称-5->1857012,主题名称-4->3844,主题名称-7->4043972,主题名称-1->1811078,主题名称-9->12217819,主题名称-3->3844,主题名称-6->1430021,主题名称-0->2808969]

相关问题