python 查询SQL日期时间字段时,Airflow XCOM出错

cyvaqqii  于 2023-04-19  发布在  Python
关注(0)|答案(1)|浏览(130)

需要一些帮助。我试图从oracle数据库查询数据。也是新的气流。
我的代码看起来像这样:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.oracle.operators.oracle import OracleOperator

default_args = {
    'owner': 'owner_name',
    'retries': 5,
    'retry_delay': timedelta(minutes=10)
}

with DAG(
    'dag_name',
    default_args=default_args,
    schedule_interval='0 1 * * *',
    start_date=datetime(2023, 4, 10),
) as dag:
    task = OracleOperator(
        task_id='task_name',
        oracle_conn_id='connection_name',
        sql="""
            select
                some_datetime_field,
                some_data_field
            from 
                some_table
        """
    )
    task

但我得到错误:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 2305, in xcom_push
    session=session,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 240, in set
    map_index=map_index,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 627, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.7/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/json.py", line 176, in encode
    return super().encode(o)
  File "/usr/local/lib/python3.7/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/json.py", line 153, in default
    CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'datetime.datetime' object has no attribute '__module__'

在我删除some_datetime_field行之后。它工作正常。所以,我不确定出了什么问题。我如何通过XCOM将SQL日期时间传递给另一个任务?

kq0g1dla

kq0g1dla1#

您可以尝试在查询中将日期字段值转换为字符串吗?https://www.oracletutorial.com/oracle-date-functions/oracle-to_char/
当前返回的对象可能不是JSON可序列化的,这是推送到XCOM的要求。
附加参考:https://github.com/apache/airflow/discussions/24881

相关问题