如何使用Apache Airflow在GCP Compute Engine示例中运行docker image

k10s72fa  于 2023-04-29  发布在  Docker
关注(0)|答案(1)|浏览(155)

我正在尝试创建一个Airflow DAG,我想从其中使用存储在Google Container Registry中的Docker映像旋转Compute Engine示例。
换句话说,我想使用gcloud操作符用airflow dags复制gcloud compute instances create-with-container。我寻找气流操作员进行此类操作,但找不到任何方法使其工作。
可能的参考文献:

  1. https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/compute.html
  2. https://cloud.google.com/composer/docs/connect-gce-vm-sshoperator
iszxjhcz

iszxjhcz1#

使用带有Airflow的VM**运行预制容器的简单而干净的解决方案可能包括以下3个步骤:
1.创建一个全新的VM(通过BashOperator),并使用启动脚本拉取/运行容器,并在运行完成时关闭VM;
1.使用PythonSensor来检查VM何时停止(即e. docker完成运行);
1.删除VM(通过BashOperator),以便在下次触发airflow dag时重复前面的步骤。
我们只需要下面的bash命令:

bash_cmd = {
    'active_account': \
        'gcloud auth activate-service-account MYCLIENTEMAIL '
        '--key-file=/PATH/TO/MY/JSON/SERVICEACCOUNT',
    'set_project': \
        'gcloud config set project MYPROJECTID',
    'list_vm': \
        'gcloud compute instances list',
    'create_vm': \
        'gcloud compute instances create-with-container VMNAME '
        '--project=MYPROJECTID --zone=MYZONE --machine-type=e2-medium '
        '--image=projects/cos-cloud/global/images/cos-stable-101-17162-40-5 '
        '--boot-disk-size=10GB --boot-disk-type=pd-balanced '
        '--boot-disk-device-name=VMNAME '
        '--container-image=eu.gcr.io/MYPROJECTID/MYCONTAINER --container-restart-policy=always '
        '--labels=container-vm=cos-stable-101-17162-40-5 --no-shielded-secure-boot '
        '--shielded-vtpm --shielded-integrity-monitoring '
        '--metadata startup-script="#!/bin/bash\n sleep 10\n sudo useradd -m bob\n sudo -u bob docker-credential-gcr configure-docker\n sudo usermod -aG docker bob\n sudo -u bob docker run eu.gcr.io/MYPROJECTID/MYCONTAINER\n sudo poweroff" ',
    'delete_vm': \
        'gcloud compute instances delete VMNAME --zone=MYZONE --delete-disks=boot',
}

active_accountset_project分别用于激活服务帐户和设置正确的工作项目(我们希望在其中运行VM)。当Airflow在示例化VM的GCP项目之外运行时,这是必需的。在所使用的服务帐户上拥有ComputeEngine权限也很重要。要运行的容器映像必须位于示例化VM的同一项目的容器注册表中。
list_vm返回项目中现有VM的列表以及相关功能和状态(RUNNING/TERMINATED)。
create_vm创建虚拟机,连接docker,从容器注册表运行。创建VM的命令可以根据您的需要进行自定义。需要注意的是,您必须添加--metadata startup-script,其中包括docker的运行以及docker运行结束时VM的关闭。(要了解启动脚本是如何生成的,请参见here)。
delete_vm只是删除由create_vm创建的VM。
所有这些命令都可以通过以下方式组合在Airflow DAG中协同工作:

import re
import os
import datetime
import subprocess

import airflow
from airflow.sensors.python import PythonSensor
from airflow.operators.bash_operator import BashOperator

def vm_run_check():
    "function to list all the VMs and check their status"
    
    finish_run = False
    output = subprocess.check_output(
        bash_cmd['active_account'] + " && " + \
        bash_cmd['set_project'] + " && " + \
        bash_cmd['list_vm'], 
        shell=True
    )
    output = output.decode("utf-8").split("\n")[:-1]

    machines = []
    for i in range(1,len(output)):
        m = {}
        for match in re.finditer(r"([A-Z_]+)( +)?", output[0]+" "*10):
            span = match.span()
            m[match.group().strip()] = output[i][span[0]:span[1]].strip()
        machines.append(m)
    machines = {m['NAME']:m for m in machines}
    
    if VMNAME in machines:
        if machines[VMNAME]['STATUS'] == 'TERMINATED':
            finish_run = True
    
    return finish_run

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

with models.DAG(
        'MYDAGNAME',
        catchup=False,
        default_args=default_args,
        start_date=datetime.datetime.now() - datetime.timedelta(days=3),
        schedule_interval='0 4 * * *',  # every day at 04:00 AM UTC
) as dag:
    
    
    create_vm = BashOperator(
           task_id="create_vm", 
           bash_command = bash_cmd['active_account'] + " && " + \
                            bash_cmd['set_project'] + " && " + \
                            bash_cmd['create_vm']
    )
    
    sensor_vm_run = PythonSensor(
        task_id="sensor_vm_run"
        python_callable=vm_run_check,
        poke_interval=60*2,  # check every 2 minutes
        timeout=60*60,  # check every 2 minutes for an hour
        soft_fail=True,
        mode="reschedule",
    )
    
    delete_vm = BashOperator(
           task_id="delete_vm", 
           bash_command = bash_cmd['active_account'] + " && " + \
                            bash_cmd['set_project'] + " && " + \
                            bash_cmd['delete_vm']
    )
    
    create_vm >> sensor_vm_run >> delete_vm

相关问题