Flink 配置未指定检查点目录“state.checkpoints.dir”

at0kjp5o  于 2023-02-17  发布在  Apache
关注(0)|答案(1)|浏览(449)

在dataproc集群上提交flink作业时收到以下错误。请查找代码库和错误。我使用的是flink 1.9.3版本。

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: f064ceaa5b318fdad9a77b2723b9ee64)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
        at org.flink.ReadFromPubsub.main(ReadFromPubsub.java:28)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:333)
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
        at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:376)
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
        ... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:303)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
        at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
        at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
        at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
        ... 10 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the RocksDB state backend: The configuration does not specify the checkpoint directory 'state.checkpoints.dir'
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:44)
        at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
        at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:154)
        at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:219)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:299)
        ... 20 more

End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        ... 4 more

我执行的代码片段

public class ReadFromPubsub
{

    public static void main(String args[]) throws Exception 
    {
        System.out.println("Flink Pubsub Code Read 1");
        
        
        
        StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
       
        
        
        
        System.out.println("Flink Pubsub Code Read 2");
        DeserializationSchema<String> deserializer = new SimpleStringSchema();
        System.out.println("Flink Pubsub Code Read 3");
        SourceFunction<String> pubsubSource = PubSubSource.newBuilder()
                                                            .withDeserializationSchema(deserializer)
.withProjectName("vz-it-np-gudv-dev-vzntdo-0")
                                                           .withSubscriptionName("subscription1")
                                                              .build();
        System.out.println("Flink Pubsub Code Read 4");
        streamExecEnv.addSource(pubsubSource).print();
        streamExecEnv.enableCheckpointing(10);
        System.out.println("Flink Pubsub Code Read 5");
        streamExecEnv.execute();
    }
}

我可以看到代码执行过程中的所有print语句。在最后一条print语句之后,我得到了错误。
所有要解决的异常

5gfr0r5j

5gfr0r5j1#

一般来说,你可以在flink-conf.yaml中的Flink配置中提供适当的保存点/检查点目录,如in the docs所示。如果你当前没有设置它,你可以通过以下方式来设置:

state.checkpoints.dir: "file://example/checkpoints"
编程方式(在作业内):
Configuration configuration = new Configuration();
conf.setString("state.checkpoints.dir", "file://example/checkpoints");

StreamExecutionEnvironment streamExecEnv = 
    StreamExecutionEnvironment.getExecutionEnvironment(configuration);
通过CLI:
flink run -Dstate.checkpoints.dir="/example/checkpoints" your-job.jar

此外,如果您实际上并不“想要”执行检查点操作,则可以在作业中删除以下配置:

streamExecEnv.enableCheckpointing(10);

相关问题