我是新的气流,并试图得到一切设置。
我目前正在学习一个教程,它提供了DAGS和气流功能的基本概述。我在Linux服务器上设置了气流,并在用户界面中填充了我的教程DAG。当我在界面中测试它时,我一直收到一个错误,说明我的DAG失败了。当我查看日志查看失败原因时,它不会填充错误消息。下面是我使用的教程中的代码。
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
default_args = {
'owner' : 'curtis',
'start_date' : pendulum.today('UTC').add(days= -2),
'depends_on_past' : False, #if the previous job fails, then this one will not execute if set to true. If set to false it will run
'email' : [''],
'email_on_failure' : True,
'email_on_retry' : True,
'retried' : 1,
'retry_delay' : timedelta(minutes = 5)
}
dag = DAG(
'bigquery_test',
default_args = default_args,
description = 'test for bigquery connection',
#schedule once per day
schedule = '@daily'
# @once schedule once and only once
# @hourly run once an hour at hte beginning of hte hour
# @daily run once a day at midnight
# @weekly run once a week at midnight on sunday morning
# @ monthly run once a montnh at midnight of the first day of the month
# @yearly run once a year at midnight of january 1st
)
#t1, t2 and t3 are exmaples of tasks created by instantiating operators
t1 = BashOperator(
task_id = 'print_date',
bash_command = 'date',
dag = dag
)
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
dag.doc_md = __doc__
t2 = BashOperator(
task_id = 'sleep',
depends_on_past = False,
bash_command = 'sleep 5',
dag = dag
)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }} "
echo "{{ macros.ds.add(ds,7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id = 'templated',
depends_on_past = False,
bash_command = templated_command,
params = {'my_param': 'parameter i passed in'},
dag = dag
)
#setting up the dependencies
#this means that t2 will depend on t1
#running successfully to run
#t1.set_downstream(t2)
#similar to above where t2 will depend on t1
#t3.set_upstream(t1)
#the bit shift operator can also be used to chain operations:
#t1>>t2
#and the upsteram dependency with the
#bit shift operator
#t2<<t1
#a list of tasks can also be set as
#dependencies. these operations
#all ahve the same effect:
#t1.set_downstream([t2,t3])
t1 >> [t2,t3]
#[t2,t3] << t1
1条答案
按热度按时间rlcwz9us1#
我刚刚运行了你的代码,它的工作,t3任务失败,虽然. x1c 0d1x
jinja2.exceptions.UndefinedError: 'module object' has no attribute 'ds'
。但是DAG本身看起来很好,能够运行。我想说这与你的python运行时或你如何设置示例有关。我建议你按照官方的Getting Started guide来配置项目或使用更容易使用的Astro CLI。在这里你也可以使用Docker Compose来设置它
另外,我想教程开始教授设置DAG的最基本方法,这没关系。仅供参考-有更清晰和更现代的方法来做这件事,例如使用Taskflow API(: