java—为什么这个使用者(在servlet上运行)不接收来自kafka主题的任何消息?

6xfqseft  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(287)

我在servlet上设置了一个kafka消费者。我阅读了kafka主题,并希望使用servlet的doget方法打印最新的值。
但是如果我把狗送回去我只会得到一个空的。。。
我不确定,但我发现另一个帖子,有人提到,这可能是因为没有连接到zookeeper服务器。所以我试着解决这个问题 zookeeper.connection 消费者的财产;即使已经过时了。
希望有人能给我一些建议。
我的servlet代码:

public class KafkaServlet extends HttpServlet implements Runnable {

public String kafkaMessage;
private Properties props;
private KafkaConsumer<String, String> consumer;
Thread Trans;

/**

* setting up the properties and other consumer using the constructor of KafkaServlet
* /

public KafkaServlet() {
    props = new Properties();
    props.put("bootstrap.servers", "localhost:9092"); //replace 'localhost' with the actual IP to get it working.
    props.put("zookeeper.connect", "localhost:2181");
    props.put("auto.offset.reset", "earliest");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("kafkaTopic"));
}

/**

* setup the thread and run it using the init-method
* /

@Override
public void init (ServletConfig config) throws ServletException {
    super.init(config);
    Trans = new Thread(this);
    Trans.setPriority(Thread.MIN_PRIORITY);
    Trans.start();
}

/**

* implement the doPost-Method if we want to use it
* /

public void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
    //empty -- nothing to post here
}

/**

* doGet will get print out our returned message from kafka
* /

@Override
public void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

res.setContentType("text/html");
PrintWriter out = res.getWriter();
String title = "Reading Parameters from kafkaTopic";

out.println("<HTML>" +
            "<BODY>\n" +
            "<H1 ALIGN=CENTER>" + title + "</H1>\n" +
            "<UL>\n" +
            "  <LI>MESSAGE: "
            + kafkaMessage + "\n" +
            "</UL>\n" +
            "</BODY></HTML>");
}

/**

* thread, which grabs the messages from kafka and stores the latest one in "kafkaMessage"
* /

@Override
public void run() {
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                //KafkaMessage.setMessage(record.value());
                kafkaMessage = record.value();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        consumer.close();
    }
}}

编辑:“localhost.log”中的最新日志文件: 19-Dec-2016 13:42:42.773 INFO [localhost-startStop-2] org.apache.catalina.core.ApplicationContext.log SessionListener: contextDestroyed() 19-Dec-2016 13:42:42.774 INFO [localhost-startStop-2] org.apache.catalina.core.ApplicationContext.log ContextListener: contextDestroyed() 19-Dec-2016 13:42:54.742 INFO [localhost-startStop-1] org.apache.catalina.core.ApplicationContext.log ContextListener: contextInitialized() 19-Dec-2016 13:42:54.745 INFO [localhost-startStop-1] org.apache.catalina.core.ApplicationContext.log SessionListener: contextInitialized()

igetnqfo

igetnqfo1#

我想出来了:问题是,为什么我没有收到任何消息是线路 props.put("bootstrap.servers", "localhost:9092"); .
我不得不把“localhost”改成我虚拟机的实际ip地址。这有点奇怪,因为我配置的所有其他属性(在ApacheStorm上;Kafka;另一个servlet(包含一个kafka生产者)与“localhost”配合得很好。

相关问题