我正在尝试将hiveoperator(hivesql)的输出导出到csv文件并将其存储在本地。想知道我怎样才能在气流中做。有人能谈谈你的想法吗?
x0fgdtte1#
可以创建继承 HiveOperator 在您的dag中,如下所示:
HiveOperator
class CustomHiveOp(HiveOperator): def execute(context): self.log.info('Executing: %s', self.hql) self.hook = self.get_hook() self.conn.to_csv( hql=self.hql, csv_filepath=self.output_filepath, schema='default', delimiter=',', lineterminator='\r\n', output_header=True, fetch_size=1000, hive_conf=None)
并将其用作:
hive_csv = CustomHiveOp( task_id='hive_to_csv', hql='YOUR_HIVE_QUERY', hive_cli_conn_id='' )
yiytaume2#
如果你使用一个 PythonOperator 或子类 HiveOperator 使用 HiveServer2Hook 在 execute 方法:
PythonOperator
HiveServer2Hook
execute
def execute(context): ... self.hook = HiveServer2Hook(...) self.conn = self.hook.get_conn() self.conn.to_csv(hql=self.hql, csv_filepath=self.output_filepath, ...)
2条答案
按热度按时间x0fgdtte1#
可以创建继承
HiveOperator
在您的dag中,如下所示:并将其用作:
yiytaume2#
如果你使用一个
PythonOperator
或子类HiveOperator
使用HiveServer2Hook
在execute
方法: