tldr公司;
有没有可能在kubernetes连接两个吊舱,因为它们在同一个本地网络中,所有端口都打开了?
动机
目前,我们已经在kubernetes集群中实现了airflow,为了使用tensorflow扩展,我们需要使用apachebeam。对于我们的用例,spark是要使用的合适的runner,由于airflow和tensorflow是用python编码的,所以我们需要使用apachebeam的便携式runner(https://beam.apache.org/documentation/runners/spark/#portability).
问题
airflow pod和作业服务器pod之间的通信导致传输错误(可能是因为作业服务器使用了一些随机端口)。
设置
为了遵循良好的隔离实践并模仿kubernetes公共设置中的spark(在pod中使用集群内的驱动程序),作业服务器实现为:
apiVersion: apps/v1
kind: Deployment
metadata:
name: beam-spark-job-server
labels:
app: airflow-k8s
spec:
selector:
matchLabels:
app: beam-spark-job-server
replicas: 1
template:
metadata:
labels:
app: beam-spark-job-server
spec:
restartPolicy: Always
containers:
- name: beam-spark-job-server
image: apache/beam_spark_job_server:2.27.0
args: ["--spark-master-url=spark://spark-master:7077"]
resources:
limits:
memory: "1Gi"
cpu: "0.7"
env:
- name: SPARK_PUBLIC_DNS
value: spark-client
ports:
- containerPort: 8099
protocol: TCP
name: job-server
- containerPort: 7077
protocol: TCP
name: spark-master
- containerPort: 8098
protocol: TCP
name: artifact
- containerPort: 8097
protocol: TCP
name: java-expansion
apiVersion: v1
kind: Service
metadata:
name: beam-spark-job-server
labels:
app: airflow-k8s
spec:
type: ClusterIP
selector:
app: beam-spark-job-server
ports:
- port: 8099
protocol: TCP
targetPort: 8099
name: job-server
- port: 7077
protocol: TCP
targetPort: 7077
name: spark-master
- port: 8098
protocol: TCP
targetPort: 8098
name: artifact
- port: 8097
protocol: TCP
targetPort: 8097
name: java-expansion
开发/错误
如果我执行命令 python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=beam-spark-job-server:8099 --environment_type=LOOPBACK
在airflow pod中,作业服务器上没有日志,终端上出现以下错误:
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:46569
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
Traceback (most recent call last):
File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/examples/wordcount.py", line 99, in <module>
run()
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/examples/wordcount.py", line 94, in run
ERROR:grpc._channel:Exception iterating requests!
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 195, in consume_request_iterator
request = next(request_iterator)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 355, in __next__
raise self._queue.get()
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
return self.runner.run_pipeline(self, self._options)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 421, in run_pipeline
job_service_handle.submit(proto_pipeline)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 115, in submit
prepare_response.staging_session_token)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 214, in stage
staging_session_token)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 241, in offer_artifacts
for request in requests:
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
return self._next()
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b"
debug_error_string = "{"created":"@1613765341.075846957","description":"Error received from peer ipv4:127.0.0.1:8098","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b","grpc_status":3}"
>
output | 'Write' >> WriteToText(known_args.output)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
self.result = self.run()
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
return self.runner.run_pipeline(self, self._options)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 421, in run_pipeline
job_service_handle.submit(proto_pipeline)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 115, in submit
prepare_response.staging_session_token)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 214, in stage
staging_session_token)
File "/home/airflow/.local/lib/python3.7/site-packages/apache_beam/runners/portability/artifact_service.py", line 241, in offer_artifacts
for request in requests:
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
return self._next()
File "/home/airflow/.local/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b"
debug_error_string = "{"created":"@1613765341.075846957","description":"Error received from peer ipv4:127.0.0.1:8098","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Unknown staging token job_b6f49cc2-6732-4ea3-9aef-774e3d22867b","grpc_status":3}"
表示传输作业时出错。如果我在同一个pod中实现作业服务器,那么我将在这两个容器之间获得完整的工作通信,我希望它们在不同的pod中具有相同的行为。
1条答案
按热度按时间ukdjmx9f1#
你需要在一个舱里部署两个集装箱