python-3.x 我怎样才能只在每次dag运行时调用BigQuery表,而不是在每次Airflow刷新时调用?

3htmauhk  于 2023-02-10  发布在  Python
关注(0)|答案(1)|浏览(105)

我有一个DAG,它查询一个表,从中提取数据,并且还使用ShortCircuitOperator检查DAG是否需要运行,也是基于BQ表名的,问题是当前每次Airflow刷新时都会查询该表,该表很小(每个查询小于1 kb),但我担心随着规模的扩大,成本会越来越高。是否有办法在每个DAG运行时只查询此表?
下面是一段代码片段,展示了正在发生的事情:

client = bigquery.Client()
def create_query_lists(list_type):
    query_job = client.query(
        """
        SELECT filename
        FROM `requests`
        """
    )
    results = query_job.result()
    results_list = []
    for row in results:
        results_list.append(row.filename)
    return results_list

   def check_contents():
        if len(create_query_lists()) == 0:
            raise ValueError('Nothing to do')
            return False
        else:
            print("There's stuff to do")
            return True
#Create task to check if data being pulled is empty, if so fail so other tasks don't run
    check_list = ShortCircuitOperator(
        task_id="check_column_not_empty",
        provide_context=True,
        python_callable=check_list_contents
        )
    
check_list #do subsequent tasks which use the same function
5hcedyr0

5hcedyr01#

如果我正确理解了您的需求,您希望仅在SQL查询的结果不为空时执行任务。
在这种情况下,也可以使用BranchPythonOperator,例如:

import airflow
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from google.cloud import bigquery

client = bigquery.Client()

def create_query_lists(list_type):
    query_job = client.query(
        """
        SELECT filename
        FROM `requests`
        """
    )
    results = query_job.result()
    results_list = []
    for row in results:
        results_list.append(row.filename)
    return results_list

def check_contents():
    if len(create_query_lists()) == 0:
        return 'KO'
    else:
        return 'OK'

with airflow.DAG(
        "your_dag",
        schedule_interval=None) as dag:
    branching = BranchPythonOperator(
        task_id='file_exists',
        python_callable=check_contents,
        provide_context=True,
        op_kwargs={
            'param': 'param'
        },
        dag=dag
    )

    ok = DummyOperator(task_id='OK', dag=dag)
    ko = DummyOperator(task_id='KO', dag=dag)

    fake_task = DummyOperator(task_id='fake_task', dag=dag)

    (branching >>
     ok >>
     fake_task)

    branching >> ko
  • BranchPythonOperator执行查询,如果结果不为空,则返回OK,否则返回KO
  • 我们创建2个DummyOperator,一个用于OK,另一个用于KO(2个分支)
  • 根据结果,我们将转到OKKO分支
  • KO分支将完成DAG,而不执行其他任务
  • OK分支将继续执行DAG,执行本示例中的后续任务(fake_task

相关问题