我有一个带气流的 Docker 给DAG做手术。希望从astronomer.providers.snowflake导入SnowflakeOperatorAsync。每次我重新安装Python,更改环境设置等。气流出现错误
Broken DAG: [/opt/airflow/dags/astro_orders.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/astro_orders.py", line 12, in <module>
from astronomer.providers.snowflake.operators.snowflake import SnowflakeOperatorAsync
ModuleNotFoundError: No module named 'astronomer'
from datetime import datetime
from airflow.models import DAG
from pandas import DataFrame
from airflow.models.baseoperator import chain
from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator, SQLTableCheckOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import datetime
from airflow.utils.task_group import TaskGroup
from astronomer.providers.snowflake.operators.snowflake import SnowflakeOperatorAsync
S3_FILE_PATH = "s3://kack-astrosdk/"
S3_CONN_ID = "aws_default"
SNOWFLAKE_CONN_ID = "snowflake_default"
SNOWFLAKE_ORDERS = "orders_table"
SNOWFLAKE_FILTERET_ORDERS = "filtered_table"
SNOWFLAKE_JOINED = "joined_table"
SNOWFLAKE_CUSTOMERS = "customers_table"
SNOWFLAKE_REPORTING = "reporting_table"
SNOWFLAKE_FORESTFIRE_TABLE = "forestfires"
with DAG(
dag_id='snowflake_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
create_table = SnowflakeOperatorAsync(
task_id='create_table',
snowflake_conn_id='snowflake_connection',
sql='CREATE TABLE test (Imie VARCHAR, Nazwisko VARCHAR, Numer INT)',
)
insert_data_1 = SnowflakeOperatorAsync(
task_id='insert_data_1',
snowflake_conn_id='snowflake_connection',
sql="INSERT INTO test (Imie, Nazwisko, Numer) VALUES ('Marcin', 'Laczek', 1), ('Jadwiga', 'Herbut', 2)",
)
insert_data_2 = SnowflakeOperatorAsync(
task_id='insert_data_2',
snowflake_conn_id='snowflake_connection',
sql="INSERT INTO test (Imie, Nazwisko, Numer) VALUES ('Stanislaw', 'Maczek', 3), ('Karolina', 'Hohoł', 4)",
)
create_table >> [insert_data_1, insert_data_2]
有没有人知道如何解决这个问题?
我已经尝试过clear python重新安装,从一开始就设置我的Airflow Docker。
1条答案
按热度按时间z31licg01#
Airflow Docker镜像不附带
astronomer-provider
。你将不得不在你的映像中安装它,或者你可以使用天文学家运行时映像要在镜像中安装它,请在dockerfile中添加以下命令
RUN pip install astronomer-providers
从https://quay.io/repository/astronomer/astro-runtime获取天文学家运行时图像,这是天文学家提供程序附带的
示例:https://github.com/astronomer/astronomer-providers/blob/main/.circleci/integration-tests/Dockerfile.astro_cloud#L2