如何合并spark SQL查询的结果,以避免大量的小文件/避免空文件

sigwle7e  于 2023-02-05  发布在  Apache
关注(0)|答案(4)|浏览(291)

上下文:在我们的数据管道中,我们使用spark SQL来运行许多查询,这些查询是由我们的最终用户以文本文件的形式提供的,然后我们对这些文本文件进行参数化。

    • 情况**:

我们的查询如下所示:

INSERT OVERWRITE TABLE ... PARTITION (...)

SELECT 
  stuff
FROM
  sometable

问题是,当你看到这个结果时,它不是创建一堆最大块大小的文件,而是创建200个小文件(因为默认情况下,spark创建200个分区)(对于一些查询,取决于输入数据和SELECT查询,对于200个分区,读取量是无限的)大量的小文件使我们不受系统管理员的欢迎。

    • 尝试修复(不起作用)**

大量文档建议,在这种情况下,您应该使用DISTRIBUTE BY,以确保给定分区的所有数据都进入同一个分区,因此,让我们尝试如下操作:

INSERT OVERWRITE TABLE ... PARTITION (...)

SELECT 
  stuff
FROM
  sometable
DISTRIBUTE BY
  1

那么,为什么这个方法不起作用(在spark 2.0和spark 2.2上测试过)呢?它确实成功地将所有数据发送到一个reducer--所有实际数据都在一个大文件中。但它仍然创建了200个文件,其中199个是空的!(我知道我们可能应该DISTRIBUTE BY我们的分区列,但这是提供最简单的可能示例)

    • 修复确实有效,但不适合我们的使用情形**

通过使用coalescepartition,可以让它做正确的事情,因此(在pyspark语法中):

select = sqlContext.sql('''SELECT stuff FROM sometable''').coalesce(1)
select.write.insertInto(target_table, overwrite=True)

但我不想这样做,因为我们需要完全改变用户向我们提供查询的方式。
我还看到我们可以设置:
conf.set("spark.sql.shuffle.partitions","1");
但我还没有尝试过,因为我不想强制(相当复杂的)查询中的所有计算都发生在一个reducer上,而只在最后写入磁盘的reducer上进行(如果我不应该担心这个问题,请告诉我!)

nnsrf1az

nnsrf1az1#

从spark 2.4开始,您可以在查询中添加一个提示来合并和重新划分最终的Select。

INSERT OVERWRITE TABLE ... PARTITION (...) 
SELECT /*+ REPARTITION(5) */ client_id, country FROM mytable;

这将生成5个文件。
在Spark 2.4之前,可能会影响查询性能,您可以将spark.sql.shuffle.partitions设置为所需文件的数量。

gkl3eglg

gkl3eglg2#

(我知道我们可能应该使用DISTRIBUTE BY分区列,但这只是提供了一个最简单的示例)
所以看起来我简化的尝试是我出错的地方。如果我DISTRIBUTE BY实际的列而不是人为的1(即DISTRIBUTE BY load_date或其他),那么它不会创建空文件。为什么?谁知道呢...
(This也匹配merge-multiple-small-files-in-to-few-larger-files-in-spark线程上的this答案)

k5hmc34c

k5hmc34c3#

这对我来说是一个真实的烦人的问题,我花了一段时间才解决。
以下两种方法对我很有效:
1.在外部将其作为直线脚本运行:

set hive.exec.dynamic.partition.mode=nonstrict;
     set hive.merge.mapfiles=true;
     set hive.merge.mapredfiles=true;
     set hive.merge.smallfiles.avgsize=64512000;
     set hive.merge.size.per.task=12992400;
     set hive.exec.max.dynamic.partitions=2048;
     set hive.exec.max.dynamic.partitions.pernode=1024;

     <insert overwrite command>

这种方法的问题是,不知何故,它不能在pyspark内部工作,而可以作为外部beeline脚本从python脚本中运行
1.使用重新分区
我发现这个选项相当不错。Repartition(x)允许将pyspark Dataframe 的记录压缩到“x”文件中。
现在,由于不可能用一个静态的数字“x”来重新划分每个表,因为表的大小会变化(例如,我不想将一个有10 mil记录的表重新划分为1),我做了以下操作。

-> set an upper threshold for the max number of records a partition should hold 
I use 100,000 

-> compute x as : 
x = df.count()/max_num_records_per_partition
In case the table is partitioned,I use df_partition instead of df...i.e for every set of partition values, i filter df_partition from df; and then compute x from df_partition

-> repartition as:
df = df.repartition(x)
In case if the table is partitioned; i use df_partition = df_partition.repartition(x)

-> insert overwrite dataframe

这种方法对我来说很方便。
更进一步,表中的列数和用于每列的数据类型可用于创建权重,该权重可用于获得对给定dataFrame的再划分数的有效得多的估计(例如,与具有相同类型的5列的dataFrame相比,具有20列的dataFrame将获得更高的权重;与类型为Boolean的1列dataFrame相比,类型为Map的1列dataFrame将获得更高的权重

wr98u20j

wr98u20j4#

双向

合并比重新划分好。
阅读此主题:https://kontext.tech/article/1155/use-spark-sql-partitioning-hints

INSERT OVERWRITE TABLE ... PARTITION (...) 
SELECT /*+ REPARTITION(5) */ client_id, country FROM mytable;
INSERT OVERWRITE TABLE ... PARTITION (...) 
SELECT /*+ COALESCE(5) */ client_id, country FROM mytable;

相关问题