java保证向kafka集群传递多条消息

0ve6wy6x  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(399)

如果我将一行中的多条消息发布到kafka集群(使用新的producerapi),我会得到一个 Future 每封邮件的制作人。
现在,假设我已经将我的生产者配置为 max.in.flight.requests.per.connection = 1 以及 retries > 0 我能不能等到最后一个未来,并确保之前的所有产品都已交付(并按顺序)?或者我需要等待所有的未来?在代码中,我可以这样做:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
Future<?> f = null;
for(MessageType message : messages){
  f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue());
}
try {
  f.get();
} catch(ExecutionException e) {
  //handle exception
}

而不是这样:

Producer<String, String> producer = new KafkaProducer<>(myConfig);
List<Future<?>> futureList = new ArrayList<>();
for(MessageType message : messages){
  futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()));
}
try {
  for(Future<?> f : futureList) {
    f.get();
  }
} catch(ExecutionException e) {
  //handle exception
}

请确保,如果此处没有捕获任何内容(来自第一个片段):

try {
  f.get();
} catch(ExecutionException e) {

然后,我的所有消息都按顺序存储在集群中(无论生产者是否在引擎盖下执行了任何重试),如果出现问题,那么我将在那里得到一个异常,即使它不是第一次遇到问题的最后一个将来(我正在等待)?
还有什么奇怪的案子要注意吗?

a2mppw5e

a2mppw5e1#

更进一步说,您还可以在完成循环中所有消息的发送后调用flush()。此调用将阻止,直到所有的未来都已完成,因此在此之后,您可以检查任何异常的未来。你需要抓住所有的未来才能做到这一点。
另一种方法是对发送使用回调并存储任何返回的异常,如下所示。再次使用flush可以确保在检查异常之前完成所有发送。

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}
yvt65v4c

yvt65v4c2#

您可以这样做,但前提是a)将重试次数设置为无限(或实际上无限)并且b)如果遇到不可重试的异常,则可以丢弃数据。
再解释一下,Kafka有两类例外。可重试异常是失败,如果再次运行它,您可能会成功。例如 NotEnoughReplicasException 指示复制副本比您需要的少,因此请求被拒绝。但是,如果失败的代理重新联机,那么您可能有足够的副本,恢复到良好状态,并且如果您再次发送请求,请求将成功。相比之下,一个 SerializationException 是不可重试的,因为我们没有理由相信,如果你尝试再次序列化的结果将是不同的。
生产者重试只适用于您遇到不可重试异常的点。因此,如果您从未达到上述任何一项,请使用无限次重试,并使用您提到的其他设置,一旦最终的未来得到解决,订购和成功交付将得到保证。但是,由于您可能会遇到不可重试的异常,因此处理每个未来(或回调)并确保在请求失败时至少记录一些内容肯定会更好。

相关问题