我在Flink应用程序中使用了flink-connector-kafka
,语义设置为EXACTLY_ONCE
,我看到下面的日志表明Kafka生产者已经关闭并重新连接:
Closing the Kafka producer with timeoutMillis = 0 ms.
Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
查看源代码,我发现了来自生产者提交函数的close调用。提交函数调用finally块中的recycleTransactionalProducer
,recycleTransactionalProducer
函数调用close函数,后者打印日志。为什么Kafka生产者每次提交都被关闭?
软件包中的源代码:org.apache.flink.streaming.connectors.kafka;
org.apache.kafka.clients.producer;
@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
availableTransactionalIds.add(producer.getTransactionalId());
producer.flush();
producer.close(Duration.ofSeconds(0));
}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
1条答案
按热度按时间alen0pnh1#
引用自http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-FlinkKafkaProducer-closing-after-timeoutMillis-9223372036854775807-ms-td39488.html:
...当对FlinkKafkaProducer使用exactly-once语义时,会为每个并发检查点创建一个固定大小的短期Kafka生成器池。当检查点开始时,FlinkKafkaProducer会为该检查点创建一个新的生成器。一旦所述检查点完成,则会尝试关闭并回收该检查点的生成器。因此,如果您使用的是一次事务性的FlinkKafkaProducer,那么看到Kafka生产者的日志被关闭是很正常的。