使用Flink Kubernetes Operator并在Minio上使用S3设置FlinkSessionJob

kadbb459  于 2023-09-28  发布在  Apache
关注(0)|答案(1)|浏览(216)

我正在尝试使用Flink Kubernetes操作符在会话集群中运行具有多个作业的应用程序。我的问题是,我需要使用s3存储的检查点和保存点。我使用本地k3 d安装,也有minio pod设置为s3存储。
因此,我确实创建了一个基于https://github.com/apache/flink-kubernetes-operator/releases/tag/release-1.4.0的自定义镜像,并在/opt/flink下的插件目录中添加了s3-fs-hadoops3-fs-presto目录,在其中复制了镜像的/opt/flink/opt目录中相应的jar。
我继续使用该自定义映像创建FlinkDeployment资源,并且我能够看到部署似乎成功启动并启用了s3插件。FlinkDeployment的初始日志如下所示

Enabling required built-in plugins
    Linking flink-s3-fs-hadoop-1.16.1.jar to plugin directory
    Linking1 flink-s3-fs-hadoop-1.16.1 to plugin directory
    Successfully enabled flink-s3-fs-hadoop-1.16.1.jar
    Linking flink-s3-fs-presto-1.16.1.jar to plugin directory
    Linking1 flink-s3-fs-presto-1.16.1 to plugin directory
    Successfully enabled flink-s3-fs-presto-1.16.1.jar
    sed: couldn't open temporary file /opt/flink/conf/sedNLg8lR: Read-only file system
    sed: couldn't open temporary file /opt/flink/conf/sed6vCEtB: Read-only file system
    /docker-entrypoint.sh: line 74: /opt/flink/conf/flink-conf.yaml: Read-only file system
    /docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
    Starting kubernetes-session as a console application on host my-flink-cluster-5967dc856b-rlznq

另外,使用的FlinkDeploymentkubernetes manifest如下

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: my-flink-cluster
spec:
  image: k3d-test-app-registry:5050/flink-kube-s3:v1 
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    kubernetes.operator.periodic.savepoint.interval: 1h
    state.savepoints.dir: "s3://state/minio-service:9000"
    state.checkpoints.dir: "s3://state/minio-service:9000"
    s3.endpoint: http://minio-service:9000
    s3.path.style.access: "true"
    s3.access-key: ****
    s3.secret-key: ****
    metrics.reporters: prom
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
  serviceAccount: flink
  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    resource:
      memory: "1024m"
      cpu: 1
   podTemplate:
    spec:
     containers:
      - name: flink-main-container 
        env:
        - name: FLINK_PARALLELISM  
          value: "3"  
        - name: CHECKPOINT_DIR  
          value: "s3://state/minio-service:9000"  
        - name: ENABLE_BUILT_IN_PLUGINS
          value: flink-s3-fs-hadoop-1.16.1.jar;flink-s3-fs-presto-1.16.1.jar

不幸的是,当我尝试根据Flink提供的示例部署测试FlinkSessionJob时,基于以下清单

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: my-flink-job
spec:
  deploymentName: my-flink-cluster
  flinkConfiguration:
    kubernetes.operator.periodic.savepoint.interval: 1h
    state.savepoints.dir: "s3://state/minio-service:9000"
    state.checkpoints.dir: "s3://state/minio-service:9000"
    s3.endpoint: http://minio-service:9000
    s3.path.style.access: "true"
    s3.access-key: ****
    s3.secret-key: ****
  job:
    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
    parallelism: 2
    upgradeMode: stateless

我在FlinkDeployment的pod日志中得到以下错误

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:337) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:245) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:511) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:317) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.16.1.jar:1.16.1]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
        ... 1 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the followin
g plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-
stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems
. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

这似乎指向了插件中缺少的s3 jar,尽管当我连接FlinkDeployment pod中的交互式终端时,我可以看到s3 jar存在于插件目录中。
我也尝试了基于以下问题的方法,使用s3(Getting JAR file from S3 using Flink Kubernetes operator)的stackoverflow,但不幸的是结果是一样的。
你能指出我做错了什么或者错过了什么吗?有没有人在s3中使用 *FlinkSessionJob *,你能给我一个相关的例子吗?

qjp7pelc

qjp7pelc1#

这可能有点晚,但你只需要两个库中的一个:
关于docs
对于大多数用例,您可以使用我们的flink-s3-fs-hadoop和flink-s3-fs-presto之一。
您不需要添加额外的Haddo
Flink提供了两个文件系统来与Amazon S3对话,flink-s3-fs-presto和flink-s3-fs-hadoop。这两种实现都是自包含的,没有依赖性足迹。

相关问题