通过配置单元上的parquet提高写并行性

ghg1uchk  于 2021-06-25  发布在  Hive
关注(0)|答案(1)|浏览(336)

热释光;dr-我正在将大量数据写入hive上的一个新Parquet格式表中,但该作业使用的减缩器远远少于指定的减缩器,这使得写入所需的时间比我希望的要长得多。
我正在构建一个数据湖表,打算用spark创建快速读取,但是我正在用hive编写数据,这样a)bucked表可以被hive读取,b)这样我就可以将统计信息写入hive元存储。
我使用python创建表,如下所示:

hivecur.execute("set hive.cbo.enable=true")
hivecur.execute("set hive.compute.query.using.stats=true")
hivecur.execute("set hive.stats.fetch.column.stats=true")
hivecur.execute("set hive.stats.fetch.partition.stats=true")

hivecur.execute("set hive.vectorized.execution.enabled=true")
hivecur.execute("set hive.vectorized.execution.reduce.enabled=true")

hivecur.execute('set mapred.reduce.tasks=100')

hivecur.execute(f"set dfs.block.size={1024*1024*128}")
hivecur.execute(f"set parquet.block.size={1024*1024*128}")

hivecur.execute(f"drop table if exists {TABLE_NAME}")

table_create_qry = f"""
create table {TABLE_NAME} (
    {schema.dice}
)
partitioned by (process_date_z int, dataset string)
clustered by (id) sorted by (source_id, type, id) into 200 buckets
stored as parquet
TBLPROPERTIES ("comment" = "{git_hash()}",
               "parquet.compress" = "snappy")

当我插入:

qry = f"""
        insert overwrite table {TABLE_NAME} partition (process_date_z, dataset)
        select ...
            source_id,
            process_date_z,
            '{dataset}' as dataset
        from {source_table}
        where process_date_z = {d}
        and pmod(hash(id),100) in ({",".join([str(x) for x in id_filters])})"""

通过设置 mapred.reduce.tasks=100 我希望我能强制每个分区包含100个文件。相反,尽管创建了100个任务,但92个任务的完成速度非常快,8个reduce任务的运行时间更长,只需编写大约相同大小的低十个(但不是100个)文件。
这样做的问题是,缩减是写过程中的一个重要瓶颈。我可以设置什么参数来提高性能?

k4aesqcs

k4aesqcs1#

我认为我的问题来自于对散列函数的愚蠢选择。
我怀疑用于按id进行bucket的算法与我用于子集id的哈希相同,因此它为所有可能的输入id创建了一个bucket,但pmod只允许它填充少数几个。
为了解决这个问题,我用brickhouse的murringhash3udf切换了pmod中的散列。
https://github.com/klout/brickhouse

相关问题