{kafkav0.10.1}如果异步生产者apachekafkax中发生任何错误,则生成消息大约需要1分钟

368yc8dk  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(306)

我正在使用异步生产者向代理发送消息。如果我的回调中没有任何错误,我发现生成消息大约需要0.3秒。但当我低于错误[1]时,我发现生成消息需要60秒。但我没有看到任何信息丢失。所有消息在代理中都可用。是什么导致了这个错误?我在每50条信息中都能看到这种延迟。当我遇到这个错误时,如何提高生产者的性能?
代码;

producer.send(new ProducerRecord(topic, this), new ProducerCallback ());

  private class ProducerCallback implements Callback {

        @Override

        public void onCompletion(RecordMetadata recordMetadata, Exception ex) {

            if (ex != null) {

              log.error("Error when publishing messages to the topic. Topic :"+ recordMetadata.topic(),ex);

            }

        }

    }

生产者财产

acks=1

linger.ms=10

batch.size=51200

bootstrap.servers=aukk1.xx.com\:9092,aukk2.xx.com\:9092,aukk3.xx.com\:9092

key.serializer=org.apache.kafka.common.serialization.StringSerializer

value.serializer=com.xx.KafkaPayloadSerializer

[1]

04:51:20,025 ERROR [org.apache.kafka.clients.producer.internals.RecordBatch] (kafka-producer-network-thread | producer-673) Error executing user-provided callback on message for topic-partition RAW_XML1harveyzhu-1:: java.lang.NullPointerException
       at com.lxx.kafkamodels.KafkaPayload$ProducerCallback.onCompletion(KafkaPayload.java:204)
       at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
       at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:155)
       at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
       at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:205)
       at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
       at java.lang.Thread.run(Thread.java:745)
du7egjpx

du7egjpx1#

如果ex不为null,则第一个参数“recordmetadata”为null,因此在调用recordmetadata.topic()时会看到npe

bvjxkvbb

bvjxkvbb2#

使用以下代码生成记录元数据:

if (ex == null) {  
            logger.info("Successfully received the details as: \n" +  
                    "Topic:" + recordMetadata.topic() + "\n" +  
                    "Partition:" + recordMetadata.partition() + "\n" +  
                    "Offset" + recordMetadata.offset() + "\n" +  
                    "Timestamp" + recordMetadata.timestamp());  
                      }  

         else {  
            logger.error("Can't produce,getting error",e);

相关问题