给定以下DAG定义:
from airflow.hooks.postgres_hook import PostgresHook
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
def get_data():
sql_stmt = "SELECT * FROM table"
pg_hook = PostgresHook(
postgres_conn_id='postgres_connection',
schema='postgres'
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
return cursor.fetchall()
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['etl'])
def etl():
@task()
def extract():
return get_data()
data = extract()
etl_dag = etl()
运行测试任务(airflow tasks test etl extract
)时,返回以下错误:
Traceback (most recent call last):
File "<path>/Airflow/venv/bin/airflow", line 8, in <module>
sys.exit(main())
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 52, in command
return func(*args, **kwargs)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 108, in wrapper
return f(*args, **kwargs)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 576, in task_test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1677, in run
self._run_raw_task(
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1383, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1529, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1596, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=xcom_value, session=session)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2298, in xcom_push
XCom.set(
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 234, in set
value = cls.serialize_value(
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 627, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/json/__init__.py", line 234, in dumps
return cls(
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 176, in encode
return super().encode(o)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "<path>/Airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 153, in default
CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'datetime.datetime' object has no attribute '__module__'
数据库表中的timestamp
字段触发了该错误。指定非timestamp
字段的行为符合预期。
有没有一种方法可以改变返回数据中字段的类型,或者让XCOM正确地解析字段?
1条答案
按热度按时间8i9zcol21#
我也有同样的问题,我使用docker运行airflow,当我使用镜像
apache/airflow:2.5.3-python3.10
时,我得到了这个错误,我通过迁移到apache/airflow:2.6.0-python3.10
解决了这个问题。从我所读到的,直到版本3.7之前的python datetime都有一个名为__module__
的属性来确定定义它的模块的名称。然而,从Python 3.7开始,这个属性不再使用,并且当试图访问它时会发生错误。