在使用impala插入(parquet)表期间对分区键进行sql排序

hwazgwia  于 2021-05-27  发布在  Hadoop
关注(0)|答案(2)|浏览(953)

我有一个etl作业,我想将.csv文件中的数据附加到impala表中。目前,我通过使用新数据(以.csv.lzo格式)创建一个临时的外部.csv表来实现这一点,然后将其插入到主表中。
我使用的查询如下所示:

INSERT INTO TABLE main_table
PARTITION(yr, mth)
SELECT
    *,
    CAST(extract(ts, "year") AS SMALLINT) AS yr,
    CAST(extract(ts, "month") AS TINYINT) AS mth
FROM csv_table

哪里 main_table 定义如下(多个列被截断):

CREATE TABLE IF NOT EXISTS main_table (
    tid             INT,
    s1              VARCHAR,
    s2              VARCHAR,
    status          TINYINT,
    ts              TIMESTAMP,
    n1              DOUBLE,
    n2              DOUBLE,
    p               DECIMAL(3,2),
    mins            SMALLINT,
    temp            DOUBLE
)
PARTITIONED BY (yr SMALLINT, mth TINYINT)
STORED AS PARQUET

数据的大小约为几gb(5500万行,约30列),这需要一个多小时才能运行。我很好奇为什么会出现这种情况(因为对于本质上是追加操作的东西来说,这种情况似乎很长),在查询计划中遇到了这种情况:

F01:PLAN FRAGMENT [HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))] hosts=2 instances=2
|  Per-Host Resources: mem-estimate=1.01GB mem-reservation=12.00MB thread-reservation=1
WRITE TO HDFS [default.main_table, OVERWRITE=false, PARTITION-KEYS=(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))]
|  partitions=unavailable
|  mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
|
02:SORT
|  order by: CAST(extract(ts, 'year') AS SMALLINT) ASC NULLS LAST, CAST(extract(ts, 'month') AS TINYINT) ASC NULLS LAST
|  materialized: CAST(extract(ts, 'year') AS SMALLINT), CAST(extract(ts, 'month') AS TINYINT)
|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
|  tuple-ids=1 row-size=1.29KB cardinality=unavailable
|  in pipelines: 02(GETNEXT), 00(OPEN)
|
01:EXCHANGE [HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))]
|  mem-estimate=2.57MB mem-reservation=0B thread-reservation=0
|  tuple-ids=0 row-size=1.28KB cardinality=unavailable
|  in pipelines: 00(GETNEXT)
|

显然,大部分时间和资源都花在了分区键的排序上:

Operator       #Hosts  Avg Time  Max Time   #Rows  Est. #Rows  Peak Mem  Est. Peak Mem  Detail                                                                                          
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
02:SORT             2    17m16s    30m50s  55.05M          -1  25.60 GB       12.00 MB                                                                                                  
01:EXCHANGE         2   9s493ms  12s822ms  55.05M          -1  26.98 MB        2.90 MB  HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT)) 
00:SCAN HDFS        2  51s958ms     1m10s  55.05M          -1  76.06 MB      704.00 MB  default.csv_table

Impala 为什么要这么做?有没有什么方法可以在不必对分区键排序的情况下对表进行分区,或者有没有一种方法可以加快分区速度?在我的例子中,我尝试附加的整个.csv文件只有1到2个分区键?
编辑:这很可能是因为我使用的是Parquet文件格式。不过,我的问题仍然适用:当我知道几乎不需要排序时,有没有办法加快排序的速度?
相比之下,像 SELECT COUNT(*) FROM csv_table WHERE extract(ts, "year") = 2018 AND extract(ts, "month") = 1 大约需要2-3分钟,而 ORDER BY (插入时所做的)需要一个多小时。这个例子只有键(2018,1)和(2018,2)。

q5lcpyga

q5lcpyga1#

您可以添加提示以禁用排序阶段。

INSERT INTO TABLE main_table
PARTITION(yr, mth)  /* +NOCLUSTERED */
SELECT
    *,
    CAST(extract(ts, "year") AS SMALLINT) AS yr,
    CAST(extract(ts, "month") AS TINYINT) AS mth
FROM csv_table

作为解释here:optimizer hints
/+clustered/和/+noclustered/hints/+clustered/在插入之前按分区列对数据进行排序,以确保每个节点一次只写入一个分区。使用此提示可同时减少保持打开的文件数和内存中保持的缓冲区数。这种技术主要用于插入Parquet表中,其中大的块大小需要大量的内存来同时缓冲多个输出文件的数据。此提示在impala 2.8或更高版本中可用。从impala3.0开始,/+clustered/是hdfs表的默认行为。
/+noclustered/在插入之前不按主键排序。此提示在impala 2.8或更高版本中可用。在插入到kudu表时使用此提示。
在低于impala3.0的版本中,/+noclustered/是hdfs表中的默认值。

xzabzqsa

xzabzqsa2#

impala进行排序是因为您使用动态分区。特别是对于带有oncomputed stats的表,impala不太擅长动态分区。我建议您在动态分区的情况下使用hive。如果您不打算使用hive,我的建议是:
在每个insert into语句之前,请计算csv表上的统计信息。
如果第一步做得不好,可以对几个可能的分区使用静态分区,然后运行超出可能范围的动态分区。例如;如果有一个年和月选项:

INSERT 
INTO TABLE main_table
PARTITION(yr=2019, mth=2)
SELECT
    *
FROM csv_table where CAST(extract(ts, "year") AS SMALLINT)=2019 and CAST(extract(ts, "month") AS TINYINT)=2;  
INSERT INTO TABLE main_table
PARTITION(yr, mth)
SELECT
    *,
    CAST(extract(ts, "year") AS SMALLINT),
    CAST(extract(ts, "month") AS TINYINT)
FROM csv_table where CAST(extract(ts, "year") AS SMALLINT)!=2019 and CAST(extract(ts, "month") AS TINYINT)!=2;

这些语句缩小了动态分区将处理的集合。希望能减少花费的总时间。

相关问题