为什么在Flink的“EXACTLY_ONCE”模式下Kafka生成器对每个提交都关闭

juud5qan  于 2022-12-25  发布在  Apache
关注(0)|答案(1)|浏览(373)

我在Flink应用程序中使用了flink-connector-kafka,语义设置为EXACTLY_ONCE,我看到下面的日志表明Kafka生产者已经关闭并重新连接:

Closing the Kafka producer with timeoutMillis = 0 ms.
Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.

查看源代码,我发现了来自生产者提交函数的close调用。提交函数调用finally块中的recycleTransactionalProducerrecycleTransactionalProducer函数调用close函数,后者打印日志。为什么Kafka生产者每次提交都被关闭?
软件包中的源代码:
org.apache.flink.streaming.connectors.kafka;
org.apache.kafka.clients.producer;

@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
    if (transaction.isTransactional()) {
        try {
            transaction.producer.commitTransaction();
        } finally {
            recycleTransactionalProducer(transaction.producer);
        }
    }
}

private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
    availableTransactionalIds.add(producer.getTransactionalId());
    producer.flush();
    producer.close(Duration.ofSeconds(0));
}

private void close(Duration timeout, boolean swallowException) {
    long timeoutMs = timeout.toMillis();
    if (timeoutMs < 0)
        throw new IllegalArgumentException("The timeout cannot be negative.");
    log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
alen0pnh

alen0pnh1#

引用自http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problems-with-FlinkKafkaProducer-closing-after-timeoutMillis-9223372036854775807-ms-td39488.html
...当对FlinkKafkaProducer使用exactly-once语义时,会为每个并发检查点创建一个固定大小的短期Kafka生成器池。当检查点开始时,FlinkKafkaProducer会为该检查点创建一个新的生成器。一旦所述检查点完成,则会尝试关闭并回收该检查点的生成器。因此,如果您使用的是一次事务性的FlinkKafkaProducer,那么看到Kafka生产者的日志被关闭是很正常的。

相关问题