我在GCP Dataproc上有一个可用的pyspark作业,将在气流上触发,如下所示:
config = help.loadJSON("batch/config_file")
MY_PYSPARK_JOB = {
"reference": {"project_id": "my_project_id"},
"placement": {"cluster_name": "my_cluster_name"},
"pyspark_job": {
"main_python_file_uri": "gs://file/loc/my_spark_file.py"]
"properties": config["spark_properties"]
"args": <TO_BE_ADDED>
},
}
字符串
我需要提供命令行参数到这个pyspark作业,如下所示[这是我如何从命令行运行我的pyspark作业]:
spark-submit gs://file/loc/my_spark_file.py --arg1 val1 --arg2 val2
型
我使用**“sparkparser”为我的pyspark作业提供参数。因此,arg 1是键,val 1是上面spark-submit命令中的值。
如何在上面定义的“MY_PYSPARK_JOB”中定义“args”**参数[相当于我的命令行参数]?
2条答案
按热度按时间gzszwxb41#
我终于解决了这个难题。如果我们使用ConfigParser,则必须如下所示指定密钥[无论参数是作为命令传递还是在airflow上传递]:
字符串
在airflow中,参数以Sequence[str]的形式传递(正如@Betjens在下面提到的),每个参数定义如下:
型
因此,根据我的要求,命令行参数定义如下:
型
dy1byipe2#
你必须传递一个
Sequence[str]
。如果你检查DataprocSubmitJobOperator,你会看到参数job
实现了一个类google.cloud.dataproc_v1.types.Job。字符串
因此,在关于作业类型
pySpark
(即google.cloud.dataproc_v1.types.PySparkJob)的部分中:args Sequence[str]可选。传递给驱动程序的参数。不要包括可以设置为作业属性的参数,如
--conf
,因为可能会发生冲突,导致不正确的作业提交。