我有3个运营商从气流进口。供应商。谷歌。云。运营商。数据处理
DataprocCreateBatchOperator
DataprocDeleteBatchOperator
DataprocGetBatchOperator
56lgkhnf1#
我认为,与Dataproc操作符等效的apache-airflow-providers-microsoft-azure提供程序包应该是Azure Synapse Operators。具体来说,AzureSynapseRunSparkBatchOperator允许用户“在Synapse Analytics中执行Spark应用程序”。如果你在Azure数据块上运行Spark作业,还有几个Databricks Operators可能会有所帮助。下面是一个使用AzureSynapseHook的PythonOperator示例(通过Taskflow API),注意我没有测试它,我只是用它来演示它可能的样子:
apache-airflow-providers-microsoft-azure
AzureSynapseRunSparkBatchOperator
AzureSynapseHook
PythonOperator
@task() def cancel_spark_job(job_id: str): hook = AzureSynapseHook(azure_synapse_conn_id="your_conn_id") if hook.wait_for_job_run_status(job_id, expected_statuses=("error", "dead", "killed"): hook.cancel_job_run(job_id)
此任务将等待spark job进入“error”、“dead”、“killed”或超时状态。如果spark job进入前面提到的状态之一,它将取消该作业。同样,这只是演示如何在PythonOperator中使用AzureSynapseHook,我不确定它是否工作,或者以这种方式实现它是否有意义。
l7wslrjt2#
@玛兹伦·托孙对于我的代码中的GCP,DataprocCreateBatchOperator的用法如下:-
create_batch = DataprocCreateBatchOperator( task_id="CREATE_BATCH", batch={ "pyspark_batch": { "main_python_file_uri": f"gs://{ARTIFACT_BUCKET}/spark-jobs/main.py", "args": app_args, "python_file_uris": [ f"gs://{ARTIFACT_BUCKET}/spark-jobs/jobs.zip", f"gs://{ARTIFACT_BUCKET}/spark-jobs/libs.zip" ], "jar_file_uris": test_jars, "file_uris": [ f"gs://{ARTIFACT_BUCKET}/config/params.yaml" ] }, "environment_config": { "peripherals_config": { "spark_history_server_config": {} } } }, region=REGION,batch_id=batch_id_str,)
2条答案
按热度按时间56lgkhnf1#
我认为,与Dataproc操作符等效的
apache-airflow-providers-microsoft-azure
提供程序包应该是Azure Synapse Operators。具体来说,
AzureSynapseRunSparkBatchOperator
允许用户“在Synapse Analytics中执行Spark应用程序”。如果你在Azure数据块上运行Spark作业,还有几个Databricks Operators可能会有所帮助。
下面是一个使用
AzureSynapseHook
的PythonOperator
示例(通过Taskflow API),注意我没有测试它,我只是用它来演示它可能的样子:此任务将等待spark job进入“error”、“dead”、“killed”或超时状态。如果spark job进入前面提到的状态之一,它将取消该作业。同样,这只是演示如何在
PythonOperator
中使用AzureSynapseHook
,我不确定它是否工作,或者以这种方式实现它是否有意义。l7wslrjt2#
@玛兹伦·托孙
对于我的代码中的GCP,DataprocCreateBatchOperator的用法如下:-