我正在尝试使用aifflow构建一个数据迁移管道,源是dataproc集群上的一个配置单元表,目标是bigquery。我使用dataprochiveoperator从源代码获取模式和数据。这个操作符在内部使用dataproc restapi在我们指定的dataproc集群上提交和执行作业。输出将作为作业日志的一部分写入google云存储的文件中。我只需要这些日志的查询结果。
到目前为止,我已经修改了gcp\u dataproc\u hook.py代码,通过在driveroutputsourceuri参数的帮助下将输出文件的内容作为字符串下载,从而将输出返回给调用方法。这个输出的返回类型是一个pandasDataframe(可以根据我们的方便更改为任何其他类型)。但这包括完整的日志。我必须从中提取查询结果。
下面是我在gcp\u dataproc\u hook.py中添加的代码片段,用于返回已提交查询的输出日志:
# download the output
def getOutput(self,project, output_bucket,output_path):
client = storage.Client(project=self.project_id)
bucket = client.get_bucket(output_bucket)
output_blob = ('/'.join(output_path)+"."+"000000000")
return bucket.blob(output_blob).download_as_string()
#get logs including query output
def getQueryResult(self):
result=self.job_ouput
output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
return df
下面是我尝试执行的一个示例查询:
SHOW CREATE TABLE my_tbl;
输出日志如下所示:
Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0 Connected to: Apache Hive (version 2.3.5)
1 Driver: Hive JDBC (version 2.3.5)
2 Transaction isolation: TRANSACTION_REPEATABLE_...
3 . . . . . . . . . . . . . . . . . . . . . . .>...
4 | createtab_stmt ...
5 +---------------------------------------------...
6 | CREATE TABLE `my_tbl`( ...
7 | `col1` string, ...
8 | `col2` bigint, ...
9 | `col3` string, ...
.. ...
141 | `coln` string) ...
142 | ROW FORMAT SERDE ...
143 | 'org.apache.hadoop.hive.ql.io.orc.OrcSerde...
144 | STORED AS INPUTFORMAT ...
145 | 'org.apache.hadoop.hive.ql.io.orc.OrcInput...
146 | OUTPUTFORMAT ...
147 | 'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...
148 | LOCATION ...
149 | 'gs://my_hive_data_bucket/tmp/base_table/my_tbl...
150 | TBLPROPERTIES ( ...
151 | 'transient_lastDdlTime'='1566842329') ...
152 +---------------------------------------------...
153 143 rows selected (0.154 seconds)
154 Beeline version 2.3.5 by Apache Hive
155 Closing: 0: jdbc:hive2://prod-metastore-test-c...
预期输出应如下所示:
CREATE TABLE `my_tbl`(
`col1` string,
`col2` bigint,
`col3` string,
..
`coln` string,
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
'transient_lastDdlTime'='1566842329')
请给我一个接近解决方案的方法。
1条答案
按热度按时间zbdgwd5y1#
在dataproc中,配置单元查询使用beeline而不是不推荐使用的配置单元cli,这就是默认情况下格式不同的原因。beeline通常会将人类可读的输出格式化为花哨的边框格式,而不是更容易解析的格式。
幸运的是,有一些直线选项可以使格式与旧的hivecli非常接近。您只需创建一个初始化操作,在创建dataproc集群时添加到选项中,并在airflow操作符中指定init\u actions\u uris。创建包含以下内容的文件:
把文件上传到地面军事系统
gs://some-gcs-bucket/beeline-legacyfmt.sh
并将gcs uri设置为dataproc集群的init操作。这将应用默认情况下所需的命令行选项。然后,您发送的任何dataproc配置单元作业现在都将以“tsv2”和“silent”模式输出,这意味着没有无关的日志语句,并且输出将是原始tsv。