上下文:在我们的数据管道中,我们使用spark SQL来运行许多查询,这些查询是由我们的最终用户以文本文件的形式提供的,然后我们对这些文本文件进行参数化。
- 情况**:
我们的查询如下所示:
INSERT OVERWRITE TABLE ... PARTITION (...)
SELECT
stuff
FROM
sometable
问题是,当你看到这个结果时,它不是创建一堆最大块大小的文件,而是创建200个小文件(因为默认情况下,spark创建200个分区)(对于一些查询,取决于输入数据和SELECT
查询,对于200个分区,读取量是无限的)大量的小文件使我们不受系统管理员的欢迎。
- 尝试修复(不起作用)**
大量文档建议,在这种情况下,您应该使用DISTRIBUTE BY
,以确保给定分区的所有数据都进入同一个分区,因此,让我们尝试如下操作:
INSERT OVERWRITE TABLE ... PARTITION (...)
SELECT
stuff
FROM
sometable
DISTRIBUTE BY
1
那么,为什么这个方法不起作用(在spark 2.0和spark 2.2上测试过)呢?它确实成功地将所有数据发送到一个reducer--所有实际数据都在一个大文件中。但它仍然创建了200个文件,其中199个是空的!(我知道我们可能应该DISTRIBUTE BY
我们的分区列,但这是提供最简单的可能示例)
- 修复确实有效,但不适合我们的使用情形**
通过使用coalesce
或partition
,可以让它做正确的事情,因此(在pyspark
语法中):
select = sqlContext.sql('''SELECT stuff FROM sometable''').coalesce(1)
select.write.insertInto(target_table, overwrite=True)
但我不想这样做,因为我们需要完全改变用户向我们提供查询的方式。
我还看到我们可以设置:conf.set("spark.sql.shuffle.partitions","1");
但我还没有尝试过,因为我不想强制(相当复杂的)查询中的所有计算都发生在一个reducer上,而只在最后写入磁盘的reducer上进行(如果我不应该担心这个问题,请告诉我!)
- 问题**:
- 使用 * only * spark SQL语法,我如何编写一个查询,写入尽可能少的文件,而不创建大量的空/小文件?
- 可能相关:**
- merge-multiple-small-files-into-few-larger-files-in-spark(没有解决方案必须是SparkSQL的限制,并且如上所述,
DISTRIBUTE BY
实际上不起作用) - spark coalesce doesn't work(仅对我们适用,所以这不是问题)
4条答案
按热度按时间nnsrf1az1#
从spark 2.4开始,您可以在查询中添加一个提示来合并和重新划分最终的Select。
这将生成5个文件。
在Spark 2.4之前,可能会影响查询性能,您可以将
spark.sql.shuffle.partitions
设置为所需文件的数量。gkl3eglg2#
(我知道我们可能应该使用DISTRIBUTE BY分区列,但这只是提供了一个最简单的示例)
所以看起来我简化的尝试是我出错的地方。如果我
DISTRIBUTE BY
实际的列而不是人为的1
(即DISTRIBUTE BY load_date
或其他),那么它不会创建空文件。为什么?谁知道呢...(This也匹配merge-multiple-small-files-in-to-few-larger-files-in-spark线程上的this答案)
k5hmc34c3#
这对我来说是一个真实的烦人的问题,我花了一段时间才解决。
以下两种方法对我很有效:
1.在外部将其作为直线脚本运行:
这种方法的问题是,不知何故,它不能在pyspark内部工作,而可以作为外部beeline脚本从python脚本中运行
1.使用重新分区
我发现这个选项相当不错。Repartition(x)允许将pyspark Dataframe 的记录压缩到“x”文件中。
现在,由于不可能用一个静态的数字“x”来重新划分每个表,因为表的大小会变化(例如,我不想将一个有10 mil记录的表重新划分为1),我做了以下操作。
这种方法对我来说很方便。
更进一步,表中的列数和用于每列的数据类型可用于创建权重,该权重可用于获得对给定dataFrame的再划分数的有效得多的估计(例如,与具有相同类型的5列的dataFrame相比,具有20列的dataFrame将获得更高的权重;与类型为Boolean的1列dataFrame相比,类型为Map的1列dataFrame将获得更高的权重
wr98u20j4#
双向
合并比重新划分好。
阅读此主题:https://kontext.tech/article/1155/use-spark-sql-partitioning-hints