热释光;dr-我正在将大量数据写入hive上的一个新Parquet格式表中,但该作业使用的减缩器远远少于指定的减缩器,这使得写入所需的时间比我希望的要长得多。
我正在构建一个数据湖表,打算用spark创建快速读取,但是我正在用hive编写数据,这样a)bucked表可以被hive读取,b)这样我就可以将统计信息写入hive元存储。
我使用python创建表,如下所示:
hivecur.execute("set hive.cbo.enable=true")
hivecur.execute("set hive.compute.query.using.stats=true")
hivecur.execute("set hive.stats.fetch.column.stats=true")
hivecur.execute("set hive.stats.fetch.partition.stats=true")
hivecur.execute("set hive.vectorized.execution.enabled=true")
hivecur.execute("set hive.vectorized.execution.reduce.enabled=true")
hivecur.execute('set mapred.reduce.tasks=100')
hivecur.execute(f"set dfs.block.size={1024*1024*128}")
hivecur.execute(f"set parquet.block.size={1024*1024*128}")
hivecur.execute(f"drop table if exists {TABLE_NAME}")
table_create_qry = f"""
create table {TABLE_NAME} (
{schema.dice}
)
partitioned by (process_date_z int, dataset string)
clustered by (id) sorted by (source_id, type, id) into 200 buckets
stored as parquet
TBLPROPERTIES ("comment" = "{git_hash()}",
"parquet.compress" = "snappy")
当我插入:
qry = f"""
insert overwrite table {TABLE_NAME} partition (process_date_z, dataset)
select ...
source_id,
process_date_z,
'{dataset}' as dataset
from {source_table}
where process_date_z = {d}
and pmod(hash(id),100) in ({",".join([str(x) for x in id_filters])})"""
通过设置 mapred.reduce.tasks=100
我希望我能强制每个分区包含100个文件。相反,尽管创建了100个任务,但92个任务的完成速度非常快,8个reduce任务的运行时间更长,只需编写大约相同大小的低十个(但不是100个)文件。
这样做的问题是,缩减是写过程中的一个重要瓶颈。我可以设置什么参数来提高性能?
1条答案
按热度按时间k4aesqcs1#
我认为我的问题来自于对散列函数的愚蠢选择。
我怀疑用于按id进行bucket的算法与我用于子集id的哈希相同,因此它为所有可能的输入id创建了一个bucket,但pmod只允许它填充少数几个。
为了解决这个问题,我用brickhouse的murringhash3udf切换了pmod中的散列。
https://github.com/klout/brickhouse