python-3.x 气流接口DAGS保持故障

pgky5nke  于 2023-03-31  发布在  Python
关注(0)|答案(1)|浏览(107)

我是新的气流,并试图得到一切设置。
我目前正在学习一个教程,它提供了DAGS和气流功能的基本概述。我在Linux服务器上设置了气流,并在用户界面中填充了我的教程DAG。当我在界面中测试它时,我一直收到一个错误,说明我的DAG失败了。当我查看日志查看失败原因时,它不会填充错误消息。下面是我使用的教程中的代码。

from datetime import timedelta
import airflow 
from airflow import DAG 
from airflow.operators.bash import BashOperator
import pendulum

default_args = {
    'owner' : 'curtis',
    'start_date' : pendulum.today('UTC').add(days= -2),
    'depends_on_past' : False, #if the previous job fails, then this one will not execute if set to true. If set to false it will run
    'email' :  [''], 
    'email_on_failure' : True, 
    'email_on_retry' : True, 
    'retried' : 1, 
    'retry_delay' : timedelta(minutes = 5)  
}

dag = DAG(
    'bigquery_test', 
    default_args = default_args, 
    description = 'test for bigquery connection', 
    #schedule once per day
    schedule = '@daily'
    # @once schedule once and only once
    # @hourly run once an hour at hte beginning of hte hour
    # @daily run once a day at midnight
    # @weekly run once a week at midnight on sunday morning
    # @ monthly run once a montnh at midnight of the first day of the month
    # @yearly run once a year at midnight of january 1st
)

#t1, t2 and t3 are exmaples of tasks created by instantiating operators 
t1 = BashOperator(
    task_id = 'print_date', 
    bash_command = 'date', 
    dag = dag 
)

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""

dag.doc_md = __doc__

t2 = BashOperator(
    task_id = 'sleep', 
    depends_on_past = False, 
    bash_command = 'sleep 5', 
    dag = dag
)

templated_command = """
{% for i in range(5) %}
    echo "{{ ds }} "
    echo "{{ macros.ds.add(ds,7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id = 'templated', 
    depends_on_past = False, 
    bash_command = templated_command, 
    params = {'my_param': 'parameter i passed in'},
    dag = dag
)

#setting up the dependencies
#this means that t2 will depend on t1
#running successfully to run
#t1.set_downstream(t2)

#similar to above where t2 will depend on t1
#t3.set_upstream(t1)

#the bit shift operator can also be used to chain operations: 
#t1>>t2

#and the upsteram dependency with the 
#bit shift operator
#t2<<t1

#a list of tasks can also be set as
#dependencies. these operations
#all ahve the same effect: 

#t1.set_downstream([t2,t3])
t1 >> [t2,t3]
#[t2,t3] << t1
rlcwz9us

rlcwz9us1#

我刚刚运行了你的代码,它的工作,t3任务失败,虽然. x1c 0d1x
jinja2.exceptions.UndefinedError: 'module object' has no attribute 'ds'
但是DAG本身看起来很好,能够运行。我想说这与你的python运行时或你如何设置示例有关。我建议你按照官方的Getting Started guide来配置项目或使用更容易使用的Astro CLI。在这里你也可以使用Docker Compose来设置它
另外,我想教程开始教授设置DAG的最基本方法,这没关系。仅供参考-有更清晰和更现代的方法来做这件事,例如使用Taskflow API(:

相关问题