postgresql 从Airflow中的Postgres钩子返回结果时,“datetime.datetime”对象没有属性“__module__”

carvr3hs  于 2023-05-06  发布在  PostgreSQL
关注(0)|答案(1)|浏览(81)

给定以下DAG定义:

from airflow.hooks.postgres_hook import PostgresHook

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
    'owner': 'airflow',
}

def get_data():
    sql_stmt = "SELECT * FROM table"
    pg_hook = PostgresHook(
        postgres_conn_id='postgres_connection',
        schema='postgres'
    )
    pg_conn = pg_hook.get_conn()
    cursor = pg_conn.cursor()
    cursor.execute(sql_stmt)
    return cursor.fetchall()

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['etl'])
def etl():
    @task()
    def extract():
        return get_data()

    data = extract()

etl_dag = etl()

运行测试任务(airflow tasks test etl extract)时,返回以下错误:

Traceback (most recent call last):
  File "<path>/Airflow/venv/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 52, in command
    return func(*args, **kwargs)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 108, in wrapper
    return f(*args, **kwargs)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 576, in task_test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1677, in run
    self._run_raw_task(
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1383, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1529, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1596, in _execute_task
    self.xcom_push(key=XCOM_RETURN_KEY, value=xcom_value, session=session)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2298, in xcom_push
    XCom.set(
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 234, in set
    value = cls.serialize_value(
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 627, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/json/__init__.py", line 234, in dumps
    return cls(
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 176, in encode
    return super().encode(o)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 153, in default
    CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'datetime.datetime' object has no attribute '__module__'

数据库表中的timestamp字段触发了该错误。指定非timestamp字段的行为符合预期。
有没有一种方法可以改变返回数据中字段的类型,或者让XCOM正确地解析字段?

8i9zcol2

8i9zcol21#

我也有同样的问题,我使用docker运行airflow,当我使用镜像apache/airflow:2.5.3-python3.10时,我得到了这个错误,我通过迁移到apache/airflow:2.6.0-python3.10解决了这个问题。从我所读到的,直到版本3.7之前的python datetime都有一个名为__module__的属性来确定定义它的模块的名称。然而,从Python 3.7开始,这个属性不再使用,并且当试图访问它时会发生错误。

相关问题