如何使用dataprochiveoperator从配置单元作业输出日志中提取查询结果?

e5nqia27  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(378)

我正在尝试使用aifflow构建一个数据迁移管道,源是dataproc集群上的一个配置单元表,目标是bigquery。我使用dataprochiveoperator从源代码获取模式和数据。这个操作符在内部使用dataproc restapi在我们指定的dataproc集群上提交和执行作业。输出将作为作业日志的一部分写入google云存储的文件中。我只需要这些日志的查询结果。
到目前为止,我已经修改了gcp\u dataproc\u hook.py代码,通过在driveroutputsourceuri参数的帮助下将输出文件的内容作为字符串下载,从而将输出返回给调用方法。这个输出的返回类型是一个pandasDataframe(可以根据我们的方便更改为任何其他类型)。但这包括完整的日志。我必须从中提取查询结果。
下面是我在gcp\u dataproc\u hook.py中添加的代码片段,用于返回已提交查询的输出日志:


# download the output

    def getOutput(self,project, output_bucket,output_path):
        client = storage.Client(project=self.project_id)
        bucket = client.get_bucket(output_bucket)
        output_blob = ('/'.join(output_path)+"."+"000000000")
        return bucket.blob(output_blob).download_as_string()

    #get logs including query output
    def getQueryResult(self):
        result=self.job_ouput
        output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
        df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
        return df

下面是我尝试执行的一个示例查询:

SHOW CREATE TABLE my_tbl;

输出日志如下所示:

Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0            Connected to: Apache Hive (version 2.3.5)             
1                    Driver: Hive JDBC (version 2.3.5)             
2    Transaction isolation: TRANSACTION_REPEATABLE_...             
3    . . . . . . . . . . . . . . . . . . . . . . .>...             
4    |                   createtab_stmt            ...             
5    +---------------------------------------------...             
6    | CREATE TABLE `my_tbl`(       ...             
7    |   `col1` string,            ...             
8    |   `col2` bigint,                   ...             
9    |   `col3` string,                 ...                         
..                                                 ...                         
141  |   `coln` string)                 ...             
142  | ROW FORMAT SERDE                            ...             
143  |   'org.apache.hadoop.hive.ql.io.orc.OrcSerde...             
144  | STORED AS INPUTFORMAT                       ...             
145  |   'org.apache.hadoop.hive.ql.io.orc.OrcInput...             
146  | OUTPUTFORMAT                                ...             
147  |   'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...             
148  | LOCATION                                    ...             
149  |   'gs://my_hive_data_bucket/tmp/base_table/my_tbl...             
150  | TBLPROPERTIES (                             ...             
151  |   'transient_lastDdlTime'='1566842329')     ...             
152  +---------------------------------------------...             
153                  143 rows selected (0.154 seconds)             
154               Beeline version 2.3.5 by Apache Hive             
155  Closing: 0: jdbc:hive2://prod-metastore-test-c...

预期输出应如下所示:

CREATE TABLE `my_tbl`(
  `col1` string,
  `col2` bigint,
  `col3` string,
  ..
  `coln` string,  
)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
  'transient_lastDdlTime'='1566842329')

请给我一个接近解决方案的方法。

zbdgwd5y

zbdgwd5y1#

在dataproc中,配置单元查询使用beeline而不是不推荐使用的配置单元cli,这就是默认情况下格式不同的原因。beeline通常会将人类可读的输出格式化为花哨的边框格式,而不是更容易解析的格式。
幸运的是,有一些直线选项可以使格式与旧的hivecli非常接近。您只需创建一个初始化操作,在创建dataproc集群时添加到选项中,并在airflow操作符中指定init\u actions\u uris。创建包含以下内容的文件:


# !/bin/bash

sed -i 's/beeline/beeline --outputformat=tsv2 --silent=true/' /usr/bin/beeline

把文件上传到地面军事系统 gs://some-gcs-bucket/beeline-legacyfmt.sh 并将gcs uri设置为dataproc集群的init操作。这将应用默认情况下所需的命令行选项。然后,您发送的任何dataproc配置单元作业现在都将以“tsv2”和“silent”模式输出,这意味着没有无关的日志语句,并且输出将是原始tsv。

相关问题