我有一份flink的工作(flink 1.9)。我正在使用rocksdb后端,在s3中增加检查点。
当我将flink配置为本地minio服务器的检查点时,一切正常。
当我将flink to checkpoint配置为digitalocean spaces时-restore可以正常工作,但检查点失败并出现以下错误:
2020-07-07 21:30:11,510 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 133 @ 1594157411501 for job 349e8d686e9d49c9ad8385a9ca50d36a.
2020-07-07 21:30:15,163 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 133 by task fb723c8e833de43cc87d5cd839520009 of job 349e8d686e9d49c9ad8385a9ca50d36a at 19d2a2c8a14689ba2de194be8640d7ea @ c4
0280549c18 (dataPort=36531).
2020-07-07 21:30:15,164 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 133 of job 349e8d686e9d49c9ad8385a9ca50d36a.
java.lang.Exception: Could not materialize checkpoint 133 for operator Source: KafkaSource-01 (7/9).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1082)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1024)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to s3p://bucket/flink/prod/checkpoints/349e8d686e9d49c9ad8385a9ca50d36a/chk-133/a981fae1-e38a-4faa-b4e7-75bd84edfa83 in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(Unknown Source)
at java.util.concurrent.FutureTask.get(Unknown Source)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:993)
... 3 more
Caused by: java.io.IOException: Could not flush and close the file system output stream to s3p://bucket/flink/prod/checkpoints/349e8d686e9d49c9ad8385a9ca50d36a/chk-133/a981fae1-e38a-4faa-b4e7-75bd84edfa83 in order to obtain the stream state handle
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(Unknown Source)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
... 5 more
Caused by: java.io.IOException: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad request; Request ID: null; S3 Extended Request ID: null), S3 Extended Request ID: null
at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1045)
at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:996)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
at org.apache.flink.fs.s3.common.hadoop.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:320)
... 10 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad request; Request ID: null; S3 Extended Request ID: null), S3 Extended Request ID: null
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1749)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:133)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:125)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:143)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:48)
at java.util.concurrent.FutureTask.run(Unknown Source)
... 3 more
2020-07-07 21:30:15,171 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 133 from task 0c17da9cda212b48a1c6cfe396541f86 of job 349e8d686e9d49c9ad8385a9ca50d36a at 19d2a2c8a14689ba2de194be8640d7ea @ c40280549c18 (dataPort=36531).
基于这些文档,我的理解是flink将s3v2协议与s3v4端点进行对话。有没有办法配置flink使用特定的协议版本?
谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!