我尝试使用docker,postgres和airflow. My docker-compose.yaml
file can be found here构建一个最小的数据管道,并从airflow的文档中扩展到这里。我已经扩展了它,包括一个单独的postgres数据库,我将在其中写入数据,以及一个pgadmin示例(这些都添加在底部附近)。
当我运行docker compose up -d
时,我可以确认服务正在运行并且可以访问,并且我可以登录到airflow web UI以查看我的dags。我已经创建了一个非常简单的dag,以便每分钟将日期和时间插入到表中。dag代码如下所示:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2024, 1, 1),
}
def create_table():
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
conn = pg_hook.get_conn()
cursor = conn.cursor()
create_query = """
CREATE TABLE IF NOT EXISTS fact_datetime (
datetime TIMESTAMP
);
"""
cursor.execute(create_query)
conn.commit()
cursor.close()
conn.close()
def insert_datetime():
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
conn = pg_hook.get_conn()
cursor = conn.cursor()
insert_query = """
INSERT INTO fact_datetime (datetime)
VALUES (NOW());
"""
cursor.execute(insert_query)
conn.commit()
cursor.close()
conn.close()
with DAG('insert_datetime_dag',
default_args=default_args,
description='DAG to insert current datetime every minute',
schedule_interval='*/1 * * * *',
catchup=False) as dag:
create_table_task = PythonOperator(
task_id='create_table',
python_callable=create_table,
)
insert_datetime_task = PythonOperator(
task_id='insert_datetime',
python_callable=insert_datetime,
)
create_table_task >> insert_datetime_task
字符串
在运行这个dag之前,我在airflow web UI中添加了一个postgres连接,这应该允许我使用PostgreHook
。
当我运行dag时,运行似乎停留在create_table
任务上,并有以下日志:
ce682335169d
*** Found local files:
*** * /opt/airflow/logs/dag_id=insert_datetime_dag/run_id=scheduled__2024-01-02T17:24:00+00:00/task_id=create_table/attempt=1.log
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [queued]>
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [queued]>
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2192} INFO - Executing <Task(PythonOperator): create_table> on 2024-01-02 17:24:00+00:00
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:60} INFO - Started process 148 to run task
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'insert_datetime_dag', 'create_table', 'scheduled__2024-01-02T17:24:00+00:00', '--job-id', '7', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmpkkdtejih']
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:88} INFO - Job 7: Subtask create_table
[2024-01-02, 17:25:26 UTC] {task_command.py:423} INFO - Running <TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [running]> on host ce682335169d
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='insert_datetime_dag' AIRFLOW_CTX_TASK_ID='create_table' AIRFLOW_CTX_EXECUTION_DATE='2024-01-02T17:24:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-01-02T17:24:00+00:00'
[2024-01-02, 17:25:26 UTC] {base.py:83} INFO - Using connection ID 'postgres_default' for task execution.
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 199, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 216, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/dag.py", line 16, in create_table
conn = pg_hook.get_conn()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 158, in get_conn
self.conn = psycopg2.connect(**conn_args)
File "/home/airflow/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 122, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
Is the server running on that host and accepting TCP/IP connections?
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1138} INFO - Marking task as UP_FOR_RETRY. dag_id=insert_datetime_dag, task_id=create_table, execution_date=20240102T172400, start_date=20240102T172526, end_date=20240102T172526
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 7 for task create_table (connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
Is the server running on that host and accepting TCP/IP connections?
; 148)
[2024-01-02, 17:25:26 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-01-02, 17:25:26 UTC] {taskinstance.py:3281} INFO - 0 downstream tasks scheduled from follow-on schedule check
型
如果我没看错的话,airflow似乎看不到我的postgres示例。这应该通过将端口5432暴露给airflow服务来解决。
我不知道哪个服务需要暴露给端口,我也不知道如何编辑我的docker compose文件。有人可以吗?
- 如果我对这个问题的评估是正确的,
- 建议对我的docker compose文件进行正确的编辑,这样我就可以成功运行我的dag。
2条答案
按热度按时间d5vmydt91#
修复方法:
您应该能够通过设置
postgres_default
连接变量来连接到db
postgres服务,使其指向该服务:在你的docker-compose的第55行下:
字符串
附加说明:
postgres
下运行,但这不是你在docker中添加的postgres服务-compose。您添加的postgres服务正在主机名db
下运行。型
4xrmg8kj2#
Docker Compose为您在docker-compose.yml文件中定义的每个服务分配一个网络主机条目。默认情况下,它使用服务的名称。如果不使用docker-compose. yml中定义的服务的正确名称,您将无法联系服务。
你试图连接到localhost,但是你的docker-compose将postgresql主机名定义为db和postgres,我假设你添加的是postgres。
个字符
更改代码以使用hostname db或postgres