我有一个多线程应用程序,它使用producer类生成消息,早些时候我使用下面的代码为每个请求创建producer。其中kafkaproducer是用每个请求新建的,如下所示:
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
然后我读了kafka关于producer的文档,知道我们应该使用单个producer示例来获得良好的性能。
然后我在一个单例类中创建了kafkaproducer的一个示例。
现在我们应该在何时何地关闭生产商。显然,如果我们在第一次发送请求后关闭生产者,它不会发现生产者重新发送消息,因此抛出:
java.lang.IllegalStateException: Cannot send after the producer is closed.
或者我们如何在关闭后重新连接到生产商。问题是如果程序崩溃或者有异常呢?
3条答案
按热度按时间polhcujo1#
kafka producer应该是线程安全的,并且使用它的线程池非常节省。你可能想用
而不是
在节目结束或你确定不再需要它之前,让制作人一直开放。
如果仍要关闭生产者,请按需重新创建。
rbl8hiat2#
如javadoc中所述
KafkaProducer
:src公司:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/kafkaproducer.html#close()
因此,您不必担心您的消息不会被发送,即使您在发送后立即拨打close。
如果您计划多次使用kafkaproducer,请仅在使用完毕后关闭它。如果您仍然希望保证消息在方法完成之前实际发送,而不是在缓冲区中等待,那么使用
KafkaProducer#flush()
在发送当前缓冲区之前将被阻止。你也可以阻止Future#get()
如果你愿意的话。如果你不打算关闭kafkaproducer,还有一个要注意的问题(例如,在短期应用程序中,你只发送一些数据,发送后应用程序立即终止)。kafkaproducer io线程是守护进程线程,这意味着jvm不会等到该线程完成后才终止vm。因此,要确保您的消息真正被发送,请使用
KafkaProducer#flush()
,无参数KafkaProducer#close()
或阻止Future#get()
.zf2sa74q3#
这个
KafkaProducer.send
方法是异步的,它返回Future[RecordMetadata]
. 如果你close
在发送之后,立即就有了竞争条件,并且由于KafkaProducer
你的消息可能永远不会被发送。如果您的生产者在应用程序的整个生命周期中都在使用,请不要关闭它,一旦应用程序终止,就让它死掉。如文档中所述,生产者在多线程环境中使用是安全的,因此您应该重用同一示例。
如果你仍然认为你需要关闭
KafkaProducer
在某些情况下,可以添加isClosed
在kafka对象中标记它,并在使用者需要重新发送数据时监视它。草图可以是: