我正在读一份文件,把Kafka的每一张唱片都扔了。这是我的制作人代码:
public void produce(String topicName, String filePath, String bootstrapServers, String encoding) {
try (BufferedReader bf = getBufferedReader(filePath, encoding);
KafkaProducer<Object, String> producer = initKafkaProducer(bootstrapServers)) {
String line;
long count = 0;
while ((line = bf.readLine()) != null) {
count++;
producer.send(new ProducerRecord<>(topicName, line), (metadata, e) -> {
if(e != null){
e.printStackTrace();
//write record to some file.
}
});
}
producer.flush();
CustomLogger.log("Done producing data messages. Total no of records produced:" + count);
} catch (IOException e) {
Throwables.propagate(e);
}
}
private static KafkaProducer<Object, String> initKafkaProducer(String bootstrapServer) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServer);
properties.put("key.serializer", StringSerializer.class.getCanonicalName());
properties.put("value.serializer", StringSerializer.class.getCanonicalName());
properties.put("acks", "-1");
properties.put("retries", 4);
return new KafkaProducer<>(properties);
}
private BufferedReader getBufferedReader(String filePath, String encoding) throws UnsupportedEncodingException, FileNotFoundException {
return new BufferedReader(new InputStreamReader(new FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));
}
根据我们的基本测试,由于timeoutexception,生成消息可能会失败。但是根据回调timeoutexception的官方文档,它是一个可重试的异常。意味着在下次重试时可能会产生此消息。因此,如果在回调中发现timeoutexception,我不能认为记录发送失败。有没有什么切实可行的方法,我可以肯定地说,记录发送失败,并记录在单独的文件?
1条答案
按热度按时间eoxn13cs1#
我简单地看了一下代码,不认为您需要在这里区分可检索和不可检索的异常,因为这在kafkaproducer中已经发生过。
当您将producer配置为retries值大于1时,它将在将异常返回给您之前,按您告诉它的次数重新发送任何因可重试异常而失败的消息(批处理)。
所以基本上,你得到的任何信息都有一个例外,制作人已经放弃了。
看看completebatch&canretry中的代码以确认我的理解,但我个人认为这种行为是有意义的。