postgresql 气流ti.xcom_pull()返回无

fnvucqvd  于 2023-03-08  发布在  PostgreSQL
关注(0)|答案(1)|浏览(145)

我已经连接到本地Postgres数据库。我正在尝试用get_titanic_data函数从"task_get_titanic_data"中的表中提取一些行,并通过第二个任务task_process_titanic_dataprocess_titanic_data应用一些更改。

import pandas as pd
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_titanic_data():
    sql_stmt = 'select * from titanic limit 100'
    pg_hook = PostgresHook(
        postgres_conn_id='postgres_db',
        schema='airflow_database'
    )
    pg_conn = pg_hook.get_conn()
    cursor = pg_conn.cursor()
    cursor.execute(sql_stmt)
    result = cursor.fetchall()
    return result

def process_titanic_data(ti):
    # print(ti.xcom_pull(task_ids=['get_titanic_data']))
    titanic = ti.xcom_pull(task_ids=['get_titanic_data'])
    if not titanic:
        raise Exception('No data')

    titanic = pd.DataFrame(data=titanic, columns=[
                                                    'PassengerId', 'Survived',
                                                    'Pclass', 'Name',
                                                    'Sex', 'Age',
                                                    'SibSp', 'Parch',
                                                    'Fare', 'Embarked'
                            ])

    titanic = titanic[(titanic.Age <= 16) & (titanic.Pclass == 1) & (titanic.Survived == 1)]
    titanic.to_csv(Variable.get('tmp_titanic_csv_location'), index=False)

with DAG(
        dag_id='postgres_db_dag',
        schedule_interval='@daily',
        start_date=datetime(year=2023, month=2, day=21),
        catchup=False
) as dag:
    task_get_titanic_data = PythonOperator(
        task_id='get_titanic_data',
        python_callable=get_titanic_data,
        # do_xcom_push=True
    )

    task_process_titanic_data = PythonOperator(
        task_id='process_titanic_data',
        python_callable=process_titanic_data
    )

当我通过第一个任务获取数据时-一切正常。

问题是:当我启动第二个任务时,从process_titanic_data函数引发异常:

1.为什么?
1.我该怎么补救呢?
我不知道原因。似乎所有变量创建和所有路径是确定的。
尝试进入Task_instance并了解原因...检查了所有变量和路径...尽可能多地搜索它...看到并尝试了这个。一无所获!

ubbxdtey

ubbxdtey1#

“process_titanic_data”很可能没有从XCom提取任何内容,因为它与“get_titanic_data”任务同时运行。如果您不直接使用一个任务的输出作为另一个任务的输入(通过TaskFlow API或其他方式),则需要显式设置相关性。
添加get_titanic_data >> process_titanic_data add到DAG文件的末尾应该可以达到这个目的。
顺便说一句,我也看到了一个“问题”,那就是你是如何拉取XComs的。如果task_ids参数是任务ID的可迭代对象(例如列表等),Airflow假设你想要检索多个XComs,而检索到的XComs将是那些检索到的XComs的列表。假设你不想要“titanic”表中记录列表的列表。请尝试使用titanic = ti.xcom_pull(task_ids='get_titanic_data')

相关问题