线程“main”org.i0itec.zkclient.exception.zktimeoutexception中的异常

7d7tgy0s  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(229)

我在试着管理Kafka消费者。这是我的java代码:

package com.consumer.demo;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaConsumer 
{
  private final ConsumerConnector consumer;
  private final String topic;
  private ExecutorService executor;

  public KafkaConsumer(String a_zookeeper, String a_groupId, String a_topic) 
  {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig(a_zookeeper, a_groupId));
    this.topic = a_topic;
  }

  public void shutdown() 
  {
    if (consumer != null) consumer.shutdown();
    if (executor != null) executor.shutdown();
  }

  public void run(int a_numThreads) 
  {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads //
    executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages //
    int threadNumber = 0;
    for (final KafkaStream stream : streams)
    {
      executor.submit(new ConsumerTest(stream, threadNumber));
      threadNumber++;
    }
  }

  private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)
  {
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("zookeeper.session.timeout.ms", "4000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "smallest");
    return new ConsumerConfig(props);
  }

  public static void main(String[] args)
  {
    String zooKeeper = args[0];
    String groupId = args[1];
    String topic = args[2];
    int threads = Integer.parseInt(args[3]);
    KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topic);
    example.run(threads);
    try { 
      Thread.sleep(10000);
    } catch (InterruptedException ie)
    { }
    example.shutdown();
    }
  }
}

使用者的运行时参数为:10.25.3.207:2181 vulab123 1
我得到以下错误:

Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 4000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
at com.consumer.demo.KafkaConsumer.<init>(KafkaConsumer.java:18)
at com.consumer.demo.KafkaConsumer.main(KafkaConsumer.java:58)

关于如何解决这个问题有什么建议吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题