获取Azure气流运算符的等效DataprocCreateBatchOperator运算符

o2g1uqev  于 2023-01-21  发布在  其他
关注(0)|答案(2)|浏览(114)

我有3个运营商从气流进口。供应商。谷歌。云。运营商。数据处理

  1. DataprocCreateBatchOperator
  2. DataprocDeleteBatchOperator
  3. DataprocGetBatchOperator
    需要相同类型的Azure操作员。可以请人看看这个或我必须创建一个新的操作员?
56lgkhnf

56lgkhnf1#

我认为,与Dataproc操作符等效的apache-airflow-providers-microsoft-azure提供程序包应该是Azure Synapse Operators
具体来说,AzureSynapseRunSparkBatchOperator允许用户“在Synapse Analytics中执行Spark应用程序”。
如果你在Azure数据块上运行Spark作业,还有几个Databricks Operators可能会有所帮助。
下面是一个使用AzureSynapseHookPythonOperator示例(通过Taskflow API),注意我没有测试它,我只是用它来演示它可能的样子:

@task()
def cancel_spark_job(job_id: str):
    hook = AzureSynapseHook(azure_synapse_conn_id="your_conn_id")
    if hook.wait_for_job_run_status(job_id, expected_statuses=("error", "dead", "killed"):
        hook.cancel_job_run(job_id)

此任务将等待spark job进入“error”、“dead”、“killed”或超时状态。如果spark job进入前面提到的状态之一,它将取消该作业。同样,这只是演示如何在PythonOperator中使用AzureSynapseHook,我不确定它是否工作,或者以这种方式实现它是否有意义。

l7wslrjt

l7wslrjt2#

@玛兹伦·托孙
对于我的代码中的GCP,DataprocCreateBatchOperator的用法如下:-

create_batch = DataprocCreateBatchOperator(
    task_id="CREATE_BATCH",
    batch={
            "pyspark_batch": {
                "main_python_file_uri": f"gs://{ARTIFACT_BUCKET}/spark-jobs/main.py",
                "args": app_args,
                "python_file_uris": [
                    f"gs://{ARTIFACT_BUCKET}/spark-jobs/jobs.zip",
                    f"gs://{ARTIFACT_BUCKET}/spark-jobs/libs.zip"
                ],
                "jar_file_uris": test_jars,
                "file_uris": [
                    f"gs://{ARTIFACT_BUCKET}/config/params.yaml"
                ]
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {}
                }
            }
        },
        region=REGION,batch_id=batch_id_str,)

相关问题