java—收集未在kafka上成功编写的消息

t5zmwmid  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(398)

我正在读一份文件,把Kafka的每一张唱片都扔了。这是我的制作人代码:

public void produce(String topicName, String filePath, String bootstrapServers, String encoding) {
     try (BufferedReader bf = getBufferedReader(filePath, encoding);
                 KafkaProducer<Object, String> producer = initKafkaProducer(bootstrapServers)) {
                String line;
                long count = 0;
                while ((line = bf.readLine()) != null) {
                    count++;
                    producer.send(new ProducerRecord<>(topicName, line), (metadata, e) -> {
                        if(e != null){
                            e.printStackTrace();
                            //write record to some file.
                        }
                    });
                }
                producer.flush();
                CustomLogger.log("Done producing data messages. Total no of records produced:" + count);
            } catch (IOException e) {
                Throwables.propagate(e);
            }
}
 private static KafkaProducer<Object, String> initKafkaProducer(String bootstrapServer) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServer);
        properties.put("key.serializer", StringSerializer.class.getCanonicalName());
        properties.put("value.serializer", StringSerializer.class.getCanonicalName());
        properties.put("acks", "-1");
        properties.put("retries", 4);
        return new KafkaProducer<>(properties);
    }

private BufferedReader getBufferedReader(String filePath, String encoding) throws UnsupportedEncodingException, FileNotFoundException {
    return new BufferedReader(new InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));
}

根据我们的基本测试,由于timeoutexception,生成消息可能会失败。但是根据回调timeoutexception的官方文档,它是一个可重试的异常。意味着在下次重试时可能会产生此消息。因此,如果在回调中发现timeoutexception,我不能认为记录发送失败。有没有什么切实可行的方法,我可以肯定地说,记录发送失败,并记录在单独的文件?

eoxn13cs

eoxn13cs1#

我简单地看了一下代码,不认为您需要在这里区分可检索和不可检索的异常,因为这在kafkaproducer中已经发生过。
当您将producer配置为retries值大于1时,它将在将异常返回给您之前,按您告诉它的次数重新发送任何因可重试异常而失败的消息(批处理)。
所以基本上,你得到的任何信息都有一个例外,制作人已经放弃了。
看看completebatch&canretry中的代码以确认我的理解,但我个人认为这种行为是有意义的。

相关问题