我有一个DAG,它查询一个表,从中提取数据,并且还使用ShortCircuitOperator检查DAG是否需要运行,也是基于BQ表名的,问题是当前每次Airflow刷新时都会查询该表,该表很小(每个查询小于1 kb),但我担心随着规模的扩大,成本会越来越高。是否有办法在每个DAG运行时只查询此表?
下面是一段代码片段,展示了正在发生的事情:
client = bigquery.Client()
def create_query_lists(list_type):
query_job = client.query(
"""
SELECT filename
FROM `requests`
"""
)
results = query_job.result()
results_list = []
for row in results:
results_list.append(row.filename)
return results_list
def check_contents():
if len(create_query_lists()) == 0:
raise ValueError('Nothing to do')
return False
else:
print("There's stuff to do")
return True
#Create task to check if data being pulled is empty, if so fail so other tasks don't run
check_list = ShortCircuitOperator(
task_id="check_column_not_empty",
provide_context=True,
python_callable=check_list_contents
)
check_list #do subsequent tasks which use the same function
1条答案
按热度按时间5hcedyr01#
如果我正确理解了您的需求,您希望仅在
SQL
查询的结果不为空时执行任务。在这种情况下,也可以使用
BranchPythonOperator
,例如:BranchPythonOperator
执行查询,如果结果不为空,则返回OK
,否则返回KO
DummyOperator
,一个用于OK
,另一个用于KO
(2个分支)OK
或KO
分支KO
分支将完成DAG,而不执行其他任务OK
分支将继续执行DAG,执行本示例中的后续任务(fake_task