如何使用Flink runner将Beam Python任务提交到Kubernetes上?

vtwuwzda  于 2023-09-28  发布在  Apache
关注(0)|答案(2)|浏览(163)

我想在Kubernetes内的Flink runner上使用Beam运行一个连续的流处理作业。我一直在这里学习这个教程(https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb),但我不确定作者在谈论“flink主容器”时指的是什么。我不明白我应该如何将我的Python代码提交到集群中,当代码在容器映像本身中定义时。
Kubernetes Flink集群架构看起来像这样:

  • 单个JobManager,通过Service和Ingress暴露Flink Web UI
    *多个任务管理器,每个任务管理器运行2个容器:
  • Flink任务管理器
  • Beam worker池,暴露端口50000

示例教程中的Python代码具有Beam配置,如下所示:

options = PipelineOptions([
    "--runner=FlinkRunner",
    "--flink_version=1.10",
    "--flink_master=localhost:8081",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000"
])

很明显,当您按照教程在本地运行它时,它会与Beam worker池对话以启动应用程序。但是,如果我有一个包含我的应用程序代码的Docker镜像,并且我想在Kubernetes中启动这个应用程序,我应该在Kubernetes集群中的哪里部署这个镜像?是否作为eachTask Manager pod中的容器(因此使用localhost:50000与Beam通信)?或者我创建一个包含我的应用程序代码的单个pod,并将该pod指向我的任务管理器的端口50000-如果是这样,我有多个任务管理器是一个问题吗?
任何指向文档或示例的指针都将非常有帮助。这个other SO question有一个不完整的答案。

hc8w905p

hc8w905p1#

Flink Kubernetes Operator确实提供了一个Beam example,它解决了您面临的大多数工具问题。它是为Java编写的,但通过将Python源代码添加到Docker镜像中,您应该能够实现您正在寻找的内容。

hiz5n14c

hiz5n14c2#

使用flink的作业管理器和任务管理器的自定义图像
Dockerfile

FROM apache/flink:1.16.2-scala_2.12-java11
ARG FLINK_VERSION=1.16.2
ARG KAFKA_VERSION=2.8.0

# Install python3.7 
RUN set -ex; \
  apt-get update && \
  apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
  wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
  tar -xvf Python-3.7.9.tgz && \
  cd Python-3.7.9 && \
  ./configure --without-tests --enable-shared && \
  make -j4 && \
  make install && \
  ldconfig /usr/local/lib && \
  cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
  ln -s /usr/local/bin/python3 /usr/local/bin/python && \
  ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \
  apt-get clean && \
  rm -rf /var/lib/apt/lists/* && \
  python -m pip install --upgrade pip; \
  pip install apache-flink==${FLINK_VERSION}; \
  pip install kafka-python

RUN pip install --no-cache-dir apache-beam[gcp]

# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.7_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam/

此外,在上述部署中,没有卷来存储staging-artifacts。因此,像这样创建PVC,并根据storageclassName调整PVC

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: staging-artifacts-claim
  namespace: flink
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi
  storageClassName: standard

使用flink的自定义映像的部署yaml应该是这样的

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
  name: beam-worker-pool
  namespace: flink
spec:
  selector:
    app: flink
    component: taskmanager
  ports:
    - protocol: TCP
      port: 50000
      targetPort: 50000
      name: pool
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: custom-flink:latest
        imagePullPolicy: IfNotPresent
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging 
        securityContext:
          runAsUser: 9999
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-staging
        persistentVolumeClaim:
          claimName:  staging-artifacts-claim 
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: custom-flink:latest
        imagePullPolicy: IfNotPresent
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging 
          runAsUser: 9999
      - name: beam-worker-pool
        image: apache/beam_python3.11_sdk
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging 
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-staging 
        persistentVolumeClaim:
          claimName:  staging-artifacts-claim

以下是Flinkpipeline选项

flink_options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_master=flink-jobmanager:8081",
        "--environment_type=EXTERNAL",
        "--environment_config=beam-worker-pool:50000",
    ])

从另一个容器运行Beam代码可以是cronjob/job或deployment,要使用python sdk运行Beam代码,请使用python代码创建一个映像,并确保使用apache/beam_python3.7_sdk:2.48.0并在其中安装java,以便扩展服务将启动,否则它将使用docker。
请根据需要调整sdk的版本

相关问题