Flink Kafka消费者-更改超时异常的重试次数

f1tvaqid  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(140)

我有一个Flink作业,它使用Flink的KafkaSource(“Flink-connector-Kafka”库)从Kafka中消耗数据。
我偶尔会收到一个异常“Failed to get metadata for all topics”(我们正在调查Kafka问题和网络问题)。但是,我可以在异常日志中看到,对Kafka的请求只有一次尝试。
是否有可能将尝试次数增加到1次以上?如果有,如何增加?
异常日志:

at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600)
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:387)
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to 
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:133)
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
    ... 6 more
Caused by: java.lang.RuntimeException: Failed to get metadata for all topics.
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getAllTopicMetadata(KafkaSubscriberUtils.java:37)
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicPatternSubscriber.getSubscribedTopicPartitions(TopicPatternSubscriber.java:48)
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:130)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    ... 4 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1703430983336, tries=1, nextAllowedTryMs=1703430983497) timed out at 1703430983397 after 1 attempt(s)
    at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getAllTopicMetadata(KafkaSubscriberUtils.java:34)
    ... 9 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1703430983336, tries=1, nextAllowedTryMs=1703430983497) timed out at 1703430983397 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled listTopics request with correlation id 66857 due to node 1001 being disconnected

字符串

muk1a3rh

muk1a3rh1#

我不认为有任何方法可以直接做到这一点。
但是你可以配置一个重启策略,这样错误就不会导致终端作业失败。你还需要设置检查点,这样才能有效。

相关问题