如何在Airflow中执行HDFS操作?
确保你安装下面的python软件包
pip安装apache-气流-提供程序-apache-hdfs
#Code Snippet
#Import packages
from airflow import settings
from airflow.models import Connection
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.bash import BashOperator
#Define new DAG
dag_execute_hdfs_commands = DAG(
dag_id ='connect_hdfs',
schedule_interval='@once',
start_date=days_ago(1),
dagrun_timeout=timedelta(minutes=60),
description='excuting hdfs commands',
)
#Establish connection to HDFS
conn =Connection(
conn_id = 'webhdfs_default1',
conn_type='HDFS',
host='localhost',
login='usr_id',
password='password',
port='9000',
)
session = settings.Session()
#Following line will add new connection to you airflow default DB
#Make sure once the DAG runs successfully you comment out following line.
#Because we do not want to add same connection "webhdfs_default1" every time we perform hdfs operations.
session.add(conn) #On your next run comment this out
session.close()
if __name__ == '__main__':
dag_execute_hdfs_commands.cli()
一旦上面的DAG运行成功,你可以执行hdfs操作之后,例如,如果你想列出hdfs目录中的文件,尝试以下代码
#File listing operation
start_task = BashOperator(
task_id="start_task",
bash_command="hdfs dfs -ls /",
dag=dag_execute_hdfs_commands
)
start_task
1条答案
按热度按时间oprakyz71#
您不能将
webhdfs_default
连接与BashOperator
一起使用,因为它使用WebHDFSHook
钩子,这会创建一个客户端来查询Web HDFS服务器。您可以访问客户端进行其他操作:
如果配置文件
core.security
不是kerberos
,则客户端是来自hdfs.InsecureClient
的示例,如果配置文件core.security
不是kerberos
,则客户端是来自hdfs.ext.kerberos.KerberosClient
的示例。下面是hdfs cli客户端的文档,您可以查看哪些操作可用并使用它们。有许多可用的操作,如下载、删除、列出、读取、make_dir ...,您可以在新的Airflow操作符中调用这些操作。