如何在气流中连接hdfs?

j5fpnvbx  于 2023-02-14  发布在  HDFS
关注(0)|答案(1)|浏览(210)

如何在Airflow中执行HDFS操作?
确保你安装下面的python软件包
pip安装apache-气流-提供程序-apache-hdfs

#Code Snippet

#Import packages 
from airflow import settings
from airflow.models import Connection
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.bash import BashOperator
 
#Define new DAG
dag_execute_hdfs_commands = DAG(
      dag_id ='connect_hdfs',
      schedule_interval='@once',
      start_date=days_ago(1),
      dagrun_timeout=timedelta(minutes=60),
      description='excuting hdfs commands',
     )

#Establish connection to HDFS
conn =Connection(
     conn_id = 'webhdfs_default1',
     conn_type='HDFS',
     host='localhost',
     login='usr_id',
     password='password',
     port='9000',
    )
session = settings.Session()

#Following line will add new connection to you airflow default DB
#Make sure once the DAG runs successfully you comment out following line.
#Because we do not want to add same connection "webhdfs_default1" every time we perform hdfs operations.
session.add(conn) #On your next run comment this out
session.close()

if __name__ == '__main__':
    dag_execute_hdfs_commands.cli()

一旦上面的DAG运行成功,你可以执行hdfs操作之后,例如,如果你想列出hdfs目录中的文件,尝试以下代码

#File listing operation
start_task = BashOperator(
         task_id="start_task",
         bash_command="hdfs dfs -ls /",
         dag=dag_execute_hdfs_commands
         )

start_task
oprakyz7

oprakyz71#

您不能将webhdfs_default连接与BashOperator一起使用,因为它使用WebHDFSHook钩子,这会创建一个客户端来查询Web HDFS服务器。

  • 检查路径:检查hdfs中是否存在文件
  • load_file:将文件上载到hdfs

您可以访问客户端进行其他操作:

webHDFS_hook = WebHDFSHook(webhdfs_conn_id="<you conn id>")
client = webHDFS_hook.get_conn()
client.<operation>

如果配置文件core.security不是kerberos,则客户端是来自hdfs.InsecureClient的示例,如果配置文件core.security不是kerberos,则客户端是来自hdfs.ext.kerberos.KerberosClient的示例。下面是hdfs cli客户端的文档,您可以查看哪些操作可用并使用它们。
有许多可用的操作,如下载、删除、列出、读取、make_dir ...,您可以在新的Airflow操作符中调用这些操作。

相关问题