使用Apache beam Python SDK在Kubernetes上运行Flink Runner但面临问题

jjhzyzn0  于 2023-09-28  发布在  Apache
关注(0)|答案(1)|浏览(124)

这是我的自定义Dockerfile,其中包括Flink和beam SDK

FROM apache/flink:1.16.2-scala_2.12-java11
ARG FLINK_VERSION=1.16.2
ARG KAFKA_VERSION=2.8.0

# Install python3.7 
RUN set -ex; \
  apt-get update && \
  apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
  wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
  tar -xvf Python-3.7.9.tgz && \
  cd Python-3.7.9 && \
  ./configure --without-tests --enable-shared && \
  make -j4 && \
  make install && \
  ldconfig /usr/local/lib && \
  cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
  ln -s /usr/local/bin/python3 /usr/local/bin/python && \
  ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \
  apt-get clean && \
  rm -rf /var/lib/apt/lists/* && \
  python -m pip install --upgrade pip; \
  pip install apache-flink==${FLINK_VERSION}; \
  pip install kafka-python

RUN pip install --no-cache-dir apache-beam[gcp]

# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.7_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam/

Deployment.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
  name: beam-worker-pool
  namespace: flink
spec:
  selector:
    app: flink
    component: taskmanager
  ports:
    - protocol: TCP
      port: 50000
      targetPort: 50000
      name: pool
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: custom-flink:latest
        imagePullPolicy: IfNotPresent
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging # New volume mount
        securityContext:
          runAsUser: 9999
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-staging
        persistentVolumeClaim:
          claimName:  staging-artifacts-claim # New volume
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: custom-flink:latest
        imagePullPolicy: IfNotPresent
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging # New volume mount
        securityContext:
          runAsUser: 9999
      - name: beam-worker-pool
        image: apache/beam_python3.7_sdk
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging # New volume mount
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-staging # New volume
        persistentVolumeClaim:
          claimName:  staging-artifacts-claim

下面是我尝试在容器镜像apache/beam_python3.7_sdk:2.48.0中运行的Beam作业

import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, default_io_expansion_service

def run_beam_pipeline():
    logging.getLogger().setLevel(logging.INFO)

    consumer_config = {
        'bootstrap.servers': 'cluster-0-kafka-bootstrap:9092',
        'group.id': 'beamgrouptest',
        'auto.offset.reset': 'earliest',
        'key.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer',
        'value.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer',
    }

    topic = 'data'

    flink_options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_master=flink-jobmanager:8081",
        "--artifacts_dir=/tmp/beam-artifact-staging",
        "--environment_type=EXTERNAL",
        "--environment_config=beam-worker-pool:50000",
    ])

    with beam.Pipeline(options=flink_options) as pipeline:
        messages = (
            pipeline
            | "Read from Kafka" >> ReadFromKafka(
                consumer_config=consumer_config,
                topics=[topic],
                with_metadata=False,
                expansion_service=default_io_expansion_service(
                    append_args=[
                        '--defaultEnvironmentType=PROCESS',
                        "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
                    ]
                )
            )
            | "Print messages" >> beam.Map(print)
        )

    logging.info("Pipeline execution completed.")

if __name__ == '__main__':
    run_beam_pipeline()

>ERROR that I getting on task manager

2023-08-04 11:38:20,857 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@722335b3 2023-08-04 11:38:20,864 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend 2023-08-04 11:38:20,865 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager' 2023-08-04 11:38:20,872 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING. 2023-08-04 11:38:21,761 WARN org.apache.flink.metrics.MetricGroup [] - The operator name [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} exceeded the 80 characters length limit and was truncated. 2023-08-04 11:38:26,227 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-08-04 11:38:26,288 WARN org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Logging client failed unexpectedly. org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException: CANCELLED: client cancelled at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status.asRuntimeException(Status.java:530) ~[blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] 2023-08-04 11:38:26,562 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-08-04 11:38:30,556 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/opt/apache/beam/boot' for worker id 1-1 2023-08-04 11:38:30,676 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) 2023-08-04 11:38:30,683 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_cbc357ccb763df2852fee8c4fc7d55f2_0_0). 2023-08-04 11:38:30,748 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 c2ced87e2df3c8340f7f2a1591a7366c_cbc357ccb763df2852fee8c4fc7d55f2_0_0. 2023-08-04 11:38:31,385 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0). 2023-08-04 11:38:31,386 INFO org.apache.flink.runtime.taskmanager.Task [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0) switched from INITIALIZING to CANCELING. 2023-08-04 11:38:31,387 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0). 2023-08-04 11:38:31,476 INFO org.apache.flink.runtime.taskmanager.Task [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0) switched from CANCELING to CANCELED. 2023-08-04 11:38:31,476 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0). 2023-08-04 11:38:31,486 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0. 2023-08-04 11:38:31,823 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=192.000mb (201326587 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb (268435460 bytes), networkMemory=64.000mb (67108865 bytes)}, allocationId: b3fd6c8cb7eb19827ddaa4abee90c774, jobId: 4b59f015afa0eb4c041c3a96ff7d299d). 2023-08-04 11:38:31,857 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 4b59f015afa0eb4c041c3a96ff7d299d from job leader monitoring. 2023-08-04 11:38:31,860 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 4b59f015afa0eb4c041c3a96ff7d299d. 2023-08-04 11:38:52,429 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-08-04 11:38:52,566 INFO org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - Beam Fn Control client connected with id 1-2 Aug 04, 2023 11:38:52 AM org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise notifyListener0 WARNING: An exception was thrown by org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport$1TerminationNotifier.operationComplete() java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p54p0/com/google/common/collect/Lists at org.apache.beam.vendor.grpc.v1p54p0.com.google.common.collect.ImmutableList.indexOf(ImmutableList.java:428) at org.apache.beam.vendor.grpc.v1p54p0.com.google.common.collect.ImmutableList.contains(ImmutableList.java:438) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport.getLogLevel(NettyServerTransport.java:195) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport.notifyTerminated(NettyServerTransport.java:203) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport.access$100(NettyServerTransport.java:51) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport$1TerminationNotifier.operationComplete(NettyServerTransport.java:141) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport$1TerminationNotifier.operationComplete(NettyServerTransport.java:134) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164)

> ERROR in BEAM-WORKER-POOL

Logging client failed: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "Socket closed"
    debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Socket closed", grpc_status:14, created_time:"2023-08-04T14:34:57.878849834+00:00"}"
>... resetting
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 363, in <module>
    main(sys.argv)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 212, in main
    sdk_harness.run()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 262, in run
    for work_request in self._control_stub.Control(get_responses()):
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 475, in __next__
    return self._next()
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 881, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.UNKNOWN
    details = ""
    debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"", grpc_status:2, created_time:"2023-08-04T14:34:57.947519126+00:00"}"
fzsnzjdm

fzsnzjdm1#

我还不知道怎么用飞行者。到目前为止,我只成功地使用了portablRunner。从长远来看,我认为FlinkRunner将被弃用,所以可能值得尝试一下portableRunner?
供你参考,这是我工作的论据。这个参数用于本地docker-compose测试,这就是为什么你看到的值是localhost。

--runner=portableRunner
--job_endpoint=localhost:8099
--artifact_endpoint=localhost:8098
--environment_type=EXTERNAL
--environment_config=host.docker.internal:50000

对于kubernetes,我实际上将python workerharness作为PROCESS运行,因此任务管理器和workerharness都在同一个pod和同一个容器上。以下是我在Kuberentes上的Beam工作的论点:

--runner=portableRunner
--environment_type=PROCESS
--environment_config='{"command":"/opt/apache/beam/boot"}'

以下是我的设置的简化视图:

  • 带点的在我的kubernetes环境中组合在一起
  • 实心框是我在本地docker-compose.yaml中创建的容器

你也可以从这个gist找到我的本地测试。

相关问题