tldr公司;
可以用spark配置配置beam便携式转轮吗?更准确地说,可以配置 spark.driver.host
在便携式跑步器里?
动机
目前,我们已经在kubernetes集群中实现了airflow,为了使用tensorflow扩展,我们需要使用apachebeam。对于我们的用例,spark是要使用的合适的runner,由于airflow和tensorflow是用python编码的,所以我们需要使用apachebeam的便携式runner(https://beam.apache.org/documentation/runners/spark/#portability).
问题
便携运行程序在其容器中创建spark上下文,并且不为驱动程序dns配置留出空间,从而使工作机pod中的执行器无法与驱动程序(作业服务器)通信。
设置
在beam文档之后,job serer与airflow在同一个pod中实现,以使用这两个容器之间的本地网络。作业服务器配置:
- name: beam-spark-job-server
image: apache/beam_spark_job_server:2.27.0
args: ["--spark-master-url=spark://spark-master:7077"]
作业服务器/服务:
apiVersion: v1
kind: Service
metadata:
name: airflow-scheduler
labels:
app: airflow-k8s
spec:
type: ClusterIP
selector:
app: airflow-scheduler
ports:
- port: 8793
protocol: TCP
targetPort: 8793
name: scheduler
- port: 8099
protocol: TCP
targetPort: 8099
name: job-server
- port: 7077
protocol: TCP
targetPort: 7077
name: spark-master
- port: 8098
protocol: TCP
targetPort: 8098
name: artifact
- port: 8097
protocol: TCP
targetPort: 8097
name: java-expansion
端口80978098和8099与作业服务器相关,8793与气流相关,7077与spark master相关。
开发/错误
测试简单梁示例时 python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
从气流容器中,我在气流盒上得到以下响应:
Defaulting container name to airflow-scheduler.
Use 'kubectl describe pod/airflow-scheduler-local-f685b5bc7-9d7r6 -n airflow-main-local' to see all of the containers in this pod.
airflow@airflow-scheduler-local-f685b5bc7-9d7r6:/opt/airflow$ python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:35837
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
with Pipeline() as p:
p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING
以及工作日志:
21/02/19 19:50:00 INFO Worker: Asked to launch executor app-20210219194804-0000/47 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:00 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing view acls groups to:
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls groups to:
21/02/19 19:50:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:00 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "47" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:02 INFO Worker: Executor app-20210219194804-0000/47 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 47
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=47)
21/02/19 19:50:02 INFO Worker: Asked to launch executor app-20210219194804-0000/48 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:02 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing view acls groups to:
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls groups to:
21/02/19 19:50:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "48" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:04 INFO Worker: Executor app-20210219194804-0000/48 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 48
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=48)
21/02/19 19:50:04 INFO Worker: Asked to launch executor app-20210219194804-0000/49 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:04 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing view acls groups to:
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls groups to:
21/02/19 19:50:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:04 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "49" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
.
.
.
正如我们所看到的,executor正在不断地退出,据我所知,这个问题是由executor和驱动程序(在本例中是作业服务器)之间缺少通信造成的。此外,使用随机端口“-dspark.driver.port”将“-driver url”转换为驱动程序pod名称。由于我们无法定义服务的名称,所以工作进程尝试使用驱动程序中的原始名称并使用随机生成的端口。由于配置来自驱动程序,因此更改worker/master中的默认conf文件不会产生任何结果。以这个答案为例,我尝试使用env变量 SPARK_PUBLIC_DNS
但这不会导致工作日志中的任何更改。
obs公司
在kubernetes中直接使用spark作业 kubectl run spark-base --rm -it --labels="app=spark-client" --image bde2020/spark-base:2.4.5-hadoop2.7 -- bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host=spark-client
提供服务:
apiVersion: v1
kind: Service
metadata:
name: spark-client
spec:
selector:
app: spark-client
clusterIP: None
我有一个完整的工作Pypark壳。如果省略--conf参数,则得到与第一个设置相同的行为(无限期地退出执行器)
21/02/19 20:21:02 INFO Worker: Executor app-20210219202050-0002/4 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 4
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219202050-0002, execId=4)
21/02/19 20:21:02 INFO Worker: Asked to launch executor app-20210219202050-0002/5 for Spark shell
21/02/19 20:21:02 INFO SecurityManager: Changing view acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing view acls groups to:
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls groups to:
21/02/19 20:21:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 20:21:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=46161" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-base:46161" "--executor-id" "5" "--hostname" "172.18.0.20" "--cores" "1" "--app-id" "app-20210219202050-0002" "--worker-url" "spark://Worker@172.18.0.20:45151"
1条答案
按热度按时间sauutmhj1#
根据您的部署需求,我有三种解决方案可供选择。按难度顺序:
使用spark“uberjar”作业服务器。这将在spark master中启动嵌入式作业服务器,而不是在容器中使用独立的作业服务器。这将大大简化您的部署,因为您不需要启动
beam_spark_job_server
完全是集装箱。您可以通过spark配置文件传递属性。创建spark配置文件,并添加
spark.driver.host
以及你需要的其他财产。在docker run
命令,将该配置文件装载到容器中,并设置SPARK_CONF_DIR
指向该目录的环境变量。如果这两个都不适合您,您也可以构建自己的作业服务器容器的自定义版本。从github拉出束流源。查看您想要使用的发布分支(例如。
git checkout origin/release-2.28.0
). 修改entrypoint spark-job-server.sh并设置-Dspark.driver.host=x
在那里。然后使用./gradlew :runners:spark:job-server:container:docker -Pdocker-repository-root="your-repo" -Pdocker-tag="your-tag"
.