我希望你帮助我在Airflow容器中运行spark-submit作业时遇到的错误。我已经构建了两个镜像-一个用于Airflow,一个用于Spark,并为每个应用程序运行docker-compose文件。然后我运行docker-compose
,并通过在连接设置中定义具有主节点URI的主机来配置Airflow UI中的Spark连接,并确保连接-ID匹配DAG中使用的ID。阅读日志,DAG似乎无法定位负责连接到GCS的JAR文件(gcs-connector-hadoop3-2.2.5.jar)。我无法确定这是路径问题(相对/绝对),权限问题还是完全不同的问题。
以下是Spark Job
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
if __name__ == '__main__':
conf = SparkConf() \
.setMaster('local') \
.setAppName('test') \
.set("spark.jars", "/spark-lib/gcs-connector-hadoop3-2.2.5.jar") \
.set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", './.google/credentials/google_credentials.json')
sc = SparkContext(conf=conf.set("spark.files.overwrite", "true"))
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", './.google/credentials/google_credentials.json')
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")
spark = SparkSession.builder \
.config(conf=sc.getConf()) \
.getOrCreate()
path = "gs://tfl-cycling/pq/"
df_test = spark.read.option("recursiveFileLookup", "true").parquet(path)
df_test.printSchema()
DAG:
from datetime import timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
import pendulum
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql import types
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as F
import pandas as pd
import datetime
default_args = {
'owner': 'airflow',
'retry_delay': timedelta(minutes=5)
}
spark_dag = DAG(
dag_id = "test_1",
default_args=default_args,
schedule=None,
dagrun_timeout=timedelta(minutes=60),
description='use case of sparkoperator in airflow',
start_date = pendulum.today('UTC').add(days=-1)
)
Etl = SparkSubmitOperator(
application = "/opt/airflow/dags/example.py",
conn_id= 'spark_default',#'spark_local',
task_id='spark_submit_task_load',
dag=spark_dag
)
Etl
错误消息:
2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO Utils: Successfully started service 'SparkUI' on port 4040.
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 ERROR SparkContext: Failed to add ./../gcs-connector-hadoop3-2.2.5.jar to Spark environment
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - java.io.FileNotFoundException: Jar /opt/gcs-connector-hadoop3-2.2.5.jar not found
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.addLocalJarFile$1(SparkContext.scala:1968)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.addJar(SparkContext.scala:2023)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.$anonfun$new$12(SparkContext.scala:507)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.$anonfun$new$12$adapted(SparkContext.scala:507)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.<init>(SparkContext.scala:507)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.Gateway.invoke(Gateway.java:238)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/java.lang.Thread.run(Thread.java:829)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO Executor: Starting executor ID driver on host 8923728d9f5c
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35925.
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO NettyBlockTransferService: Server created on 8923728d9f5c:35925
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 8923728d9f5c, 35925, None)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManagerMasterEndpoint: Registering block manager 8923728d9f5c:35925 with 434.4 MiB RAM, BlockManagerId(driver, 8923728d9f5c, 35925, None)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 8923728d9f5c, 35925, None)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 8923728d9f5c, 35925, None)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - /home/***/.local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py:317: FutureWarning: Python 3.7 support is deprecated in Spark 3.4.
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - warnings.warn("Python 3.7 support is deprecated in Spark 3.4.", FutureWarning)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO SharedState: Warehouse path is 'file:/opt/***/spark-warehouse'.
[2023-04-18, 08:50:06 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:06 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: gs://tfl-cycling/pq/.
[2023-04-18, 08:50:06 UTC] {spark_submit.py:490} INFO - java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
我已经尝试将spark.jars
值定义为绝对路径和相对路径,我已经尝试提供额外的权限来读取文件。我还尝试使用"spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.5"
而不是“本地”文件,但在这里我认为我的实现可能需要额外的研究如何正确地使用脚本设置配置。
最后,我尝试将作业直接提交到Spark容器,并成功运行了上面看到的作业(这更像是一个示例)。
我对这份工作的期望是能够读取存储在GCS上的parquet文件,并使用Spark运行一些分析和转换,但首先要能够连接到GCS并读取那里的文件。
1条答案
按热度按时间lkaoscv71#
已解决-当我将JAR文件复制到映像中时,我没有在相关服务中正确挂载卷(在本例中为airflow-scheduler)。一旦我将mount添加到docker-compose文件中的服务,该文件就被识别了。边注,之前在调试问题时,我手动将文件复制到
docker cp
容器中,然而,看起来,虽然命令工作气流仍然无法访问该文件-因此他也将需要设置一个挂载的文件/目录手动以及。