我在airflow中创建了一个dag,它将数据从一个hdfs集群传输到另一个集群,并将数据加载到一个hive表中
我的气流代码是
CLUSTER2_HDFS_PATH = "hdfs://cluster2/user/{}/copydir/temp_file".format(JOB_USER)
distcp_cmd = 'hadoop distcp ' + \
' -overwrite {{ dag_run.conf["cluster1_path"] }} ' + CLUSTER2_HDFS_PATH
copy_file_hdpml_lucy = BashOperator(
task_id="copy_file_cluster1_cluster2",
bash_command=distcp_cmd,
env=utils.get_bash_env(JOB_USER),
dag=dag,
)
insert_hive_table = HiveOperator(
task_id='insert_dummy_hive_table',
hql='hive/parametarized_insert_table.hql',
hive_cli_conn_id=HIVE_CONN_ID,
params={
'hdfs_load_path': CLUSTER2_HDFS_PATH,
},
schema='fraud',
dag=dag
)
我的配置单元hql查询是
LOAD DATA INPATH '{{ params.hdfs_load_path }}' OVERWRITE INTO TABLE {{ dag_run.conf["db_name"] }}.{{ dag_run.conf["table_name"] }};
我可以使用api调用调用dag
curl -X POST \
http://prd-airflow:8080/api/experimental/dags/fra_hive_api_test/dag_runs \
-H 'Cache-Control: no-cache' \
-d '{"conf":"{ \"cluster1_path\":\"hdfs://cluster1/user/neel.choudhury/input.csv\" , \"db_name\": \"fraud\",\"table_name\": \"dummy_score\" }"}'
我的问题是我可以在bash或hive操作符中访问post请求中conf中发送的值。不过,我还想参数化一些参数,我用来创建运算符。例如,我需要参数化 hive_cli_conn_id
这样我就可以从api调用本身发送这个值。
有没有一种方法可以在创建操作符的参数也可以参数化的情况下实现呢。
暂无答案!
目前还没有任何答案,快来回答吧!