如果我将一行中的多条消息发布到kafka集群(使用新的producerapi),我会得到一个 Future
每封邮件的制作人。
现在,假设我已经将我的生产者配置为 max.in.flight.requests.per.connection = 1
以及 retries > 0
我能不能等到最后一个未来,并确保之前的所有产品都已交付(并按顺序)?或者我需要等待所有的未来?在代码中,我可以这样做:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
f.get();
} catch(ExecutionException e) {
//handle exception
}
而不是这样:
Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
for(Future<?> f : futureList) {
f.get();
}
} catch(ExecutionException e) {
//handle exception
}
请确保,如果此处没有捕获任何内容(来自第一个片段):
try {
f.get();
} catch(ExecutionException e) {
然后,我的所有消息都按顺序存储在集群中(无论生产者是否在引擎盖下执行了任何重试),如果出现问题,那么我将在那里得到一个异常,即使它不是第一次遇到问题的最后一个将来(我正在等待)?
还有什么奇怪的案子要注意吗?
2条答案
按热度按时间a2mppw5e1#
更进一步说,您还可以在完成循环中所有消息的发送后调用flush()。此调用将阻止,直到所有的未来都已完成,因此在此之后,您可以检查任何异常的未来。你需要抓住所有的未来才能做到这一点。
另一种方法是对发送使用回调并存储任何返回的异常,如下所示。再次使用flush可以确保在检查异常之前完成所有发送。
yvt65v4c2#
您可以这样做,但前提是a)将重试次数设置为无限(或实际上无限)并且b)如果遇到不可重试的异常,则可以丢弃数据。
再解释一下,Kafka有两类例外。可重试异常是失败,如果再次运行它,您可能会成功。例如
NotEnoughReplicasException
指示复制副本比您需要的少,因此请求被拒绝。但是,如果失败的代理重新联机,那么您可能有足够的副本,恢复到良好状态,并且如果您再次发送请求,请求将成功。相比之下,一个SerializationException
是不可重试的,因为我们没有理由相信,如果你尝试再次序列化的结果将是不同的。生产者重试只适用于您遇到不可重试异常的点。因此,如果您从未达到上述任何一项,请使用无限次重试,并使用您提到的其他设置,一旦最终的未来得到解决,订购和成功交付将得到保证。但是,由于您可能会遇到不可重试的异常,因此处理每个未来(或回调)并确保在请求失败时至少记录一些内容肯定会更好。