我正在尝试使用Flink Kubernetes操作符在会话集群中运行具有多个作业的应用程序。我的问题是,我需要使用s3存储的检查点和保存点。我使用本地k3 d安装,也有minio pod设置为s3存储。
因此,我确实创建了一个基于https://github.com/apache/flink-kubernetes-operator/releases/tag/release-1.4.0的自定义镜像,并在/opt/flink
下的插件目录中添加了s3-fs-hadoop和s3-fs-presto目录,在其中复制了镜像的/opt/flink/opt
目录中相应的jar。
我继续使用该自定义映像创建FlinkDeployment资源,并且我能够看到部署似乎成功启动并启用了s3插件。FlinkDeployment的初始日志如下所示
Enabling required built-in plugins
Linking flink-s3-fs-hadoop-1.16.1.jar to plugin directory
Linking1 flink-s3-fs-hadoop-1.16.1 to plugin directory
Successfully enabled flink-s3-fs-hadoop-1.16.1.jar
Linking flink-s3-fs-presto-1.16.1.jar to plugin directory
Linking1 flink-s3-fs-presto-1.16.1 to plugin directory
Successfully enabled flink-s3-fs-presto-1.16.1.jar
sed: couldn't open temporary file /opt/flink/conf/sedNLg8lR: Read-only file system
sed: couldn't open temporary file /opt/flink/conf/sed6vCEtB: Read-only file system
/docker-entrypoint.sh: line 74: /opt/flink/conf/flink-conf.yaml: Read-only file system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
Starting kubernetes-session as a console application on host my-flink-cluster-5967dc856b-rlznq
另外,使用的FlinkDeploymentkubernetes manifest如下
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: my-flink-cluster
spec:
image: k3d-test-app-registry:5050/flink-kube-s3:v1
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.operator.periodic.savepoint.interval: 1h
state.savepoints.dir: "s3://state/minio-service:9000"
state.checkpoints.dir: "s3://state/minio-service:9000"
s3.endpoint: http://minio-service:9000
s3.path.style.access: "true"
s3.access-key: ****
s3.secret-key: ****
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
serviceAccount: flink
jobManager:
resource:
memory: "1024m"
cpu: 1
taskManager:
resource:
memory: "1024m"
cpu: 1
podTemplate:
spec:
containers:
- name: flink-main-container
env:
- name: FLINK_PARALLELISM
value: "3"
- name: CHECKPOINT_DIR
value: "s3://state/minio-service:9000"
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-hadoop-1.16.1.jar;flink-s3-fs-presto-1.16.1.jar
不幸的是,当我尝试根据Flink提供的示例部署测试FlinkSessionJob时,基于以下清单
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: my-flink-job
spec:
deploymentName: my-flink-cluster
flinkConfiguration:
kubernetes.operator.periodic.savepoint.interval: 1h
state.savepoints.dir: "s3://state/minio-service:9000"
state.checkpoints.dir: "s3://state/minio-service:9000"
s3.endpoint: http://minio-service:9000
s3.path.style.access: "true"
s3.access-key: ****
s3.secret-key: ****
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 2
upgradeMode: stateless
我在FlinkDeployment的pod日志中得到以下错误
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:337) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:245) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:511) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:317) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.16.1.jar:1.16.1]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
... 1 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the followin
g plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-
stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems
. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
这似乎指向了插件中缺少的s3 jar,尽管当我连接FlinkDeployment pod中的交互式终端时,我可以看到s3 jar存在于插件目录中。
我也尝试了基于以下问题的方法,使用s3(Getting JAR file from S3 using Flink Kubernetes operator)的stackoverflow,但不幸的是结果是一样的。
你能指出我做错了什么或者错过了什么吗?有没有人在s3中使用 *FlinkSessionJob *,你能给我一个相关的例子吗?
1条答案
按热度按时间qjp7pelc1#
这可能有点晚,但你只需要两个库中的一个:
关于docs:
对于大多数用例,您可以使用我们的flink-s3-fs-hadoop和flink-s3-fs-presto之一。
您不需要添加额外的Haddo
Flink提供了两个文件系统来与Amazon S3对话,flink-s3-fs-presto和flink-s3-fs-hadoop。这两种实现都是自包含的,没有依赖性足迹。