在Docker中将正确的端口暴露给气流服务

bz4sfanl  于 12个月前  发布在  Docker
关注(0)|答案(2)|浏览(91)

我尝试使用docker,postgres和airflow. My docker-compose.yaml file can be found here构建一个最小的数据管道,并从airflow的文档中扩展到这里。我已经扩展了它,包括一个单独的postgres数据库,我将在其中写入数据,以及一个pgadmin示例(这些都添加在底部附近)。
当我运行docker compose up -d时,我可以确认服务正在运行并且可以访问,并且我可以登录到airflow web UI以查看我的dags。我已经创建了一个非常简单的dag,以便每分钟将日期和时间插入到表中。dag代码如下所示:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 1, 1),
}

def create_table():
    pg_hook = PostgresHook(postgres_conn_id='postgres_default')
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    create_query = """
        CREATE TABLE IF NOT EXISTS fact_datetime (
            datetime TIMESTAMP
        );
    """
    cursor.execute(create_query)
    conn.commit()
    cursor.close()
    conn.close()

def insert_datetime():
    pg_hook = PostgresHook(postgres_conn_id='postgres_default')
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    insert_query = """
        INSERT INTO fact_datetime (datetime)
        VALUES (NOW());
    """
    cursor.execute(insert_query)
    conn.commit()
    cursor.close()
    conn.close()

with DAG('insert_datetime_dag',
         default_args=default_args,
         description='DAG to insert current datetime every minute',
         schedule_interval='*/1 * * * *',
         catchup=False) as dag:

    create_table_task = PythonOperator(
        task_id='create_table',
        python_callable=create_table,
    )

    insert_datetime_task = PythonOperator(
        task_id='insert_datetime',
        python_callable=insert_datetime,
    )

    create_table_task >> insert_datetime_task

字符串
在运行这个dag之前,我在airflow web UI中添加了一个postgres连接,这应该允许我使用PostgreHook
当我运行dag时,运行似乎停留在create_table任务上,并有以下日志:

ce682335169d
*** Found local files:
***   * /opt/airflow/logs/dag_id=insert_datetime_dag/run_id=scheduled__2024-01-02T17:24:00+00:00/task_id=create_table/attempt=1.log
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [queued]>
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [queued]>
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2192} INFO - Executing <Task(PythonOperator): create_table> on 2024-01-02 17:24:00+00:00
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:60} INFO - Started process 148 to run task
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'insert_datetime_dag', 'create_table', 'scheduled__2024-01-02T17:24:00+00:00', '--job-id', '7', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmpkkdtejih']
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:88} INFO - Job 7: Subtask create_table
[2024-01-02, 17:25:26 UTC] {task_command.py:423} INFO - Running <TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [running]> on host ce682335169d
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='insert_datetime_dag' AIRFLOW_CTX_TASK_ID='create_table' AIRFLOW_CTX_EXECUTION_DATE='2024-01-02T17:24:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-01-02T17:24:00+00:00'
[2024-01-02, 17:25:26 UTC] {base.py:83} INFO - Using connection ID 'postgres_default' for task execution.
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 199, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 216, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/dag.py", line 16, in create_table
    conn = pg_hook.get_conn()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 158, in get_conn
    self.conn = psycopg2.connect(**conn_args)
  File "/home/airflow/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
    Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
    Is the server running on that host and accepting TCP/IP connections?
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1138} INFO - Marking task as UP_FOR_RETRY. dag_id=insert_datetime_dag, task_id=create_table, execution_date=20240102T172400, start_date=20240102T172526, end_date=20240102T172526
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 7 for task create_table (connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
    Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
    Is the server running on that host and accepting TCP/IP connections?
; 148)
[2024-01-02, 17:25:26 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-01-02, 17:25:26 UTC] {taskinstance.py:3281} INFO - 0 downstream tasks scheduled from follow-on schedule check


如果我没看错的话,airflow似乎看不到我的postgres示例。这应该通过将端口5432暴露给airflow服务来解决。
我不知道哪个服务需要暴露给端口,我也不知道如何编辑我的docker compose文件。有人可以吗?

  • 如果我对这个问题的评估是正确的,
  • 建议对我的docker compose文件进行正确的编辑,这样我就可以成功运行我的dag。
d5vmydt9

d5vmydt91#

修复方法:

您应该能够通过设置postgres_default连接变量来连接到db postgres服务,使其指向该服务:
在你的docker-compose的第55行下:

AIRFLOW_CONN_POSTGRES_DEFAULT: postgres://root:root@db:5432/airflow_db

字符串

附加说明:

  • 我没有能力在你的问题下添加评论,但评论说 “看起来你试图连接到本地主机上的数据库,但数据库实际上运行在主机名postgres上” 有点不正确。确实有一个postgres服务在主机名postgres下运行,但这不是你在docker中添加的postgres服务-compose。您添加的postgres服务正在主机名db下运行。
  • 如果你的目标是在airflow之外使用数据库,你可以考虑使用一个在本地和docker之外运行的postgresql数据库。假设你的端口号和数据库名称是相同的,你的docker-compose中的行应该是...
AIRFLOW_CONN_POSTGRES_DEFAULT: postgres://<username>:<password>@host.docker.internal:5432/airflow_db

4xrmg8kj

4xrmg8kj2#

Docker Compose为您在docker-compose.yml文件中定义的每个服务分配一个网络主机条目。默认情况下,它使用服务的名称。如果不使用docker-compose. yml中定义的服务的正确名称,您将无法联系服务。
你试图连接到localhost,但是你的docker-compose将postgresql主机名定义为db和postgres,我假设你添加的是postgres。

postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always

个字符
更改代码以使用hostname db或postgres

相关问题