关闭后如何重新连接Kafka制作人?

jdzmm42g  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(385)

我有一个多线程应用程序,它使用producer类生成消息,早些时候我使用下面的代码为每个请求创建producer。其中kafkaproducer是用每个请求新建的,如下所示:

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        isValidMsg[0] = false;
                        exception.printStackTrace();
                        saveOrUpdateLog(msgBean, producerType, exception);
                        logger.error("ERROR:Unable to produce message.",exception);
                    }
                }
            });
producer.close();

然后我读了kafka关于producer的文档,知道我们应该使用单个producer示例来获得良好的性能。
然后我在一个单例类中创建了kafkaproducer的一个示例。
现在我们应该在何时何地关闭生产商。显然,如果我们在第一次发送请求后关闭生产者,它不会发现生产者重新发送消息,因此抛出:

java.lang.IllegalStateException: Cannot send after the producer is closed.

或者我们如何在关闭后重新连接到生产商。问题是如果程序崩溃或者有异常呢?

polhcujo

polhcujo1#

kafka producer应该是线程安全的,并且使用它的线程池非常节省。你可能想用

producer.flush();

而不是

producer.close();

在节目结束或你确定不再需要它之前,让制作人一直开放。
如果仍要关闭生产者,请按需重新创建。

producer = new KafkaProducer<String, byte[]>(prop);
rbl8hiat

rbl8hiat2#

如javadoc中所述 KafkaProducer :

public void close()

Close this producer. This method blocks until all previously sent requests complete.
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS).

src公司:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/kafkaproducer.html#close()
因此,您不必担心您的消息不会被发送,即使您在发送后立即拨打close。
如果您计划多次使用kafkaproducer,请仅在使用完毕后关闭它。如果您仍然希望保证消息在方法完成之前实际发送,而不是在缓冲区中等待,那么使用 KafkaProducer#flush() 在发送当前缓冲区之前将被阻止。你也可以阻止 Future#get() 如果你愿意的话。
如果你不打算关闭kafkaproducer,还有一个要注意的问题(例如,在短期应用程序中,你只发送一些数据,发送后应用程序立即终止)。kafkaproducer io线程是守护进程线程,这意味着jvm不会等到该线程完成后才终止vm。因此,要确保您的消息真正被发送,请使用 KafkaProducer#flush() ,无参数 KafkaProducer#close() 或阻止 Future#get() .

zf2sa74q

zf2sa74q3#

这个 KafkaProducer.send 方法是异步的,它返回 Future[RecordMetadata] . 如果你 close 在发送之后,立即就有了竞争条件,并且由于 KafkaProducer 你的消息可能永远不会被发送。
如果您的生产者在应用程序的整个生命周期中都在使用,请不要关闭它,一旦应用程序终止,就让它死掉。如文档中所述,生产者在多线程环境中使用是安全的,因此您应该重用同一示例。
如果你仍然认为你需要关闭 KafkaProducer 在某些情况下,可以添加 isClosed 在kafka对象中标记它,并在使用者需要重新发送数据时监视它。草图可以是:

object KafkaOwner {
  private var producer: KafkaProducer = ???
  @volatile private var isClosed = false

  def close(): Unit = {
    if (!isClosed) {
      kafkaProducer.close()
      isClosed = true
    }
  }

  def instance: KafkaProducer = {
    this.synchronized {
      if (!isClosed) producer 
      else {
        producer = new KafkaProducer()
        isClosed = false
      }
    }
  }
}

相关问题