kafkaconsumer在kafkaserver上出错(版本0.9.0.1)

fv2wmkja  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(299)

我试图用kafka客户端库(0.9.0.1)测试生产者、消费者。代理(0.9.0.1)正在服务器上运行,我已经测试了kafkaproducer,没有问题。但是当我测试kafkaconsumer进行轮询时,代理会发出一条错误消息。
[2016-03-18 13:44:19,129]关闭/172.26.132.149的套接字时出错,原因是错误(kafka.network.processor)kafka.common.kafkaexception:kafka.api.requestkeys$.deserializerforkey(requestkeys)处的请求类型10错误。scala:57)在kafka.network.requestchannel$request.(requestchannel。scala:53)在kafka.network.processor.read(socketserver。scala:353)在kafka.network.processor.run(socketserver。scala:245)
消费者测试代码如下。

class ConsumerRunner implements Runnable{
    private KafkaConsumer<String,String> consumer;
    private String topic;
    public ConsumerRunner(String topic,Properties props){
        consumer = new KafkaConsumer<String,String>(props);
        this.topic = topic;
        consumer.subscribe(Arrays.asList(this.topic));
    }
    public void run() {
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(10000);
            for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        }
    }

}

我猜轮询请求包含错误的请求类型键,但是当我检查kafka核心源时,我重新确认请求类型键“10”被定义为“groupcoordinatory”。我在“kafka.network.requestchannel.scala”里发现了可疑代码

val requestObj =
      if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
        RequestKeys.deserializerForKey(requestId)(buffer)

      else
        null

测试使用者还显示错误消息
java.io.eofexception:org.apache.kafka.common.network.networkreceive.readfromreadablechannel(networkreceive)处为空。java:83)在org.apache.kafka.common.network.networkreceive.readfrom(networkreceive。java:71)在org.apache.kafka.common.network.kafkachannel.receive(kafkachannel。java:153)在org.apache.kafka.common.network.kafkachannel.read(kafkachannel。java:134)在org.apache.kafka.common.network.selector.poll(selector。java:286)在org.apache.kafka.clients.networkclient.poll(networkclient。java:256)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.clientpoll(consumernetworkclient。java:320)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:213)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:193)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:163)在org.apache.kafka.clients.consumer.internals.abstractcoordinator.ensurecoordinatorknown(abstractcoordinator)。java:180)在org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer。java:886)在org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer。java:853)在com.medialog.mdt.kafka.kafkatest$consumerthread.run(kafkatest。java:61)
有人有主意吗?是我的问题吗?或者其他人?请帮帮我。谢谢您。

roejwanj

roejwanj1#

民库
不确定您是否只是试图为0.9Kafka代码创建一个消费者,或者您的Kafka消息中有导致此问题的特定内容,是否可以共享更多详细信息。
但是如果您只是想为0.9编写一个kafka消费者,那么在kafka 0.9中就有了新的消费者api。如果您愿意使用新的消费者api,请查看此示例https://github.com/sdpatil/kafkaapiclient/blob/master/src/main/java/com/spnotes/kafka/simple/consumer.java 样品。
苏尼尔

相关问题