这是我的自定义Dockerfile,其中包括Flink和beam SDK
FROM apache/flink:1.16.2-scala_2.12-java11
ARG FLINK_VERSION=1.16.2
ARG KAFKA_VERSION=2.8.0
# Install python3.7
RUN set -ex; \
apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j4 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
python -m pip install --upgrade pip; \
pip install apache-flink==${FLINK_VERSION}; \
pip install kafka-python
RUN pip install --no-cache-dir apache-beam[gcp]
# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.7_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam/
Deployment.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: flink
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
name: beam-worker-pool
namespace: flink
spec:
selector:
app: flink
component: taskmanager
ports:
- protocol: TCP
port: 50000
targetPort: 50000
name: pool
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: flink-staging
mountPath: /tmp/beam-artifact-staging # New volume mount
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim # New volume
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: flink-staging
mountPath: /tmp/beam-artifact-staging # New volume mount
securityContext:
runAsUser: 9999
- name: beam-worker-pool
image: apache/beam_python3.7_sdk
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-staging
mountPath: /tmp/beam-artifact-staging # New volume mount
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging # New volume
persistentVolumeClaim:
claimName: staging-artifacts-claim
下面是我尝试在容器镜像apache/beam_python3.7_sdk:2.48.0中运行的Beam作业
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, default_io_expansion_service
def run_beam_pipeline():
logging.getLogger().setLevel(logging.INFO)
consumer_config = {
'bootstrap.servers': 'cluster-0-kafka-bootstrap:9092',
'group.id': 'beamgrouptest',
'auto.offset.reset': 'earliest',
'key.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer',
'value.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer',
}
topic = 'data'
flink_options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_master=flink-jobmanager:8081",
"--artifacts_dir=/tmp/beam-artifact-staging",
"--environment_type=EXTERNAL",
"--environment_config=beam-worker-pool:50000",
])
with beam.Pipeline(options=flink_options) as pipeline:
messages = (
pipeline
| "Read from Kafka" >> ReadFromKafka(
consumer_config=consumer_config,
topics=[topic],
with_metadata=False,
expansion_service=default_io_expansion_service(
append_args=[
'--defaultEnvironmentType=PROCESS',
"--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
]
)
)
| "Print messages" >> beam.Map(print)
)
logging.info("Pipeline execution completed.")
if __name__ == '__main__':
run_beam_pipeline()
>ERROR that I getting on task manager
2023-08-04 11:38:20,857 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@722335b3 2023-08-04 11:38:20,864 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend 2023-08-04 11:38:20,865 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager' 2023-08-04 11:38:20,872 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING. 2023-08-04 11:38:21,761 WARN org.apache.flink.metrics.MetricGroup [] - The operator name [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} exceeded the 80 characters length limit and was truncated. 2023-08-04 11:38:26,227 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-08-04 11:38:26,288 WARN org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Logging client failed unexpectedly. org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException: CANCELLED: client cancelled at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status.asRuntimeException(Status.java:530) ~[blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [blob_p-dd40044e778dd6367f67c2c36a355148bf4a84e4-56a27c85764b7818c4cbb91ed427fa9e:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] 2023-08-04 11:38:26,562 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-08-04 11:38:30,556 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/opt/apache/beam/boot' for worker id 1-1 2023-08-04 11:38:30,676 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) 2023-08-04 11:38:30,683 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_cbc357ccb763df2852fee8c4fc7d55f2_0_0). 2023-08-04 11:38:30,748 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 c2ced87e2df3c8340f7f2a1591a7366c_cbc357ccb763df2852fee8c4fc7d55f2_0_0. 2023-08-04 11:38:31,385 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0). 2023-08-04 11:38:31,386 INFO org.apache.flink.runtime.taskmanager.Task [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0) switched from INITIALIZING to CANCELING. 2023-08-04 11:38:31,387 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0). 2023-08-04 11:38:31,476 INFO org.apache.flink.runtime.taskmanager.Task [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0) switched from CANCELING to CANCELED. 2023-08-04 11:38:31,476 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0). 2023-08-04 11:38:31,486 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 c2ced87e2df3c8340f7f2a1591a7366c_03f93075562d7d50bb0b07080b2ebe35_0_0. 2023-08-04 11:38:31,823 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=192.000mb (201326587 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb (268435460 bytes), networkMemory=64.000mb (67108865 bytes)}, allocationId: b3fd6c8cb7eb19827ddaa4abee90c774, jobId: 4b59f015afa0eb4c041c3a96ff7d299d). 2023-08-04 11:38:31,857 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 4b59f015afa0eb4c041c3a96ff7d299d from job leader monitoring. 2023-08-04 11:38:31,860 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 4b59f015afa0eb4c041c3a96ff7d299d. 2023-08-04 11:38:52,429 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2023-08-04 11:38:52,566 INFO org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - Beam Fn Control client connected with id 1-2 Aug 04, 2023 11:38:52 AM org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise notifyListener0 WARNING: An exception was thrown by org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport$1TerminationNotifier.operationComplete() java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p54p0/com/google/common/collect/Lists at org.apache.beam.vendor.grpc.v1p54p0.com.google.common.collect.ImmutableList.indexOf(ImmutableList.java:428) at org.apache.beam.vendor.grpc.v1p54p0.com.google.common.collect.ImmutableList.contains(ImmutableList.java:438) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport.getLogLevel(NettyServerTransport.java:195) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport.notifyTerminated(NettyServerTransport.java:203) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport.access$100(NettyServerTransport.java:51) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport$1TerminationNotifier.operationComplete(NettyServerTransport.java:141) at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerTransport$1TerminationNotifier.operationComplete(NettyServerTransport.java:134) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) at org.apache.beam.vendor.grpc.v1p54p0.io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164)
> ERROR in BEAM-WORKER-POOL
Logging client failed: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "Socket closed"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"Socket closed", grpc_status:14, created_time:"2023-08-04T14:34:57.878849834+00:00"}"
>... resetting
Traceback (most recent call last):
File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 363, in <module>
main(sys.argv)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 212, in main
sdk_harness.run()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 262, in run
for work_request in self._control_stub.Control(get_responses()):
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 475, in __next__
return self._next()
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 881, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"", grpc_status:2, created_time:"2023-08-04T14:34:57.947519126+00:00"}"
1条答案
按热度按时间fzsnzjdm1#
我还不知道怎么用飞行者。到目前为止,我只成功地使用了portablRunner。从长远来看,我认为FlinkRunner将被弃用,所以可能值得尝试一下portableRunner?
供你参考,这是我工作的论据。这个参数用于本地docker-compose测试,这就是为什么你看到的值是localhost。
对于kubernetes,我实际上将python workerharness作为PROCESS运行,因此任务管理器和workerharness都在同一个pod和同一个容器上。以下是我在Kuberentes上的Beam工作的论点:
以下是我的设置的简化视图:
你也可以从这个gist找到我的本地测试。