加载parquet文件并保持相同数量的hdfs分区

7jmck4yq  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(669)

我有一个Parquet锉刀 /df 用120个分区保存在hdfs中。hdfs上每个分区的大小约为43.5米。
总尺寸

hdfs dfs -du -s -h /df
5.1 G  15.3 G  /df
hdfs dfs -du -h /df
43.6 M  130.7 M  /df/pid=0
43.5 M  130.5 M  /df/pid=1
...
43.6 M  130.9 M  /df/pid=119

我想将该文件加载到spark并保持相同的分区数。但是,spark会自动将文件加载到60个分区中。

df = spark.read.parquet('df')
df.rdd.getNumPartitions()
60

hdfs设置: 'parquet.block.size' 未设置。

sc._jsc.hadoopConfiguration().get('parquet.block.size')

不返回任何内容。
“dfs.blocksize”设置为128。

float(sc._jsc.hadoopConfiguration().get("dfs.blocksize"))/2**20

退货

128

将这些值中的任何一个更改为较低的值都不会导致parquet文件加载到hdfs中相同数量的分区中。
例如:

sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 64*2**20)
sc._jsc.hadoopConfiguration().setInt("dfs.blocksize", 64*2**20)

我知道43.5米远低于128米。但是,对于这个应用程序,我将立即完成许多转换,这些转换将导致120个分区中的每个分区更接近128m。
我正在努力避免自己必须在加载后在应用程序中重新分区。
有没有办法强制spark用hdfs上存储的相同数量的分区加载parquet文件?

v7pvogib

v7pvogib1#

首先,我将从检查spark如何将数据分割成分区开始。默认情况下,它取决于数据和集群的性质和大小。本文将为您提供Dataframe加载到60个分区的原因:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/sparksqlshufflepartitions_draft.html
一般来说,它的催化剂负责所有的优化(包括分区的数量),所以除非有足够的理由进行自定义设置,否则我会让它完成它的工作。如果您使用的任何转换都是广域的,那么spark仍将洗牌数据。

4ioopgfo

4ioopgfo2#

我可以使用 spark.sql.files.maxPartitionBytes 属性将分区大小保留在导入时所需的位置。
的其他配置选项文档 spark.sql.files.maxPartitionBytes 属性状态:
读取文件时要打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如parquet、json和orc)时有效。
示例(其中 spark 是一个工作 SparkSession ):

spark.conf.set("spark.sql.files.maxPartitionBytes", 67108864) ## 64Mbi

为了在转换期间控制分区的数量,我可以设置 spark.sql.shuffle.partitions ,文件中规定:
配置为联接或聚合洗牌数据时要使用的分区数。
示例(其中 spark 是一个工作 SparkSession ):

spark.conf.set("spark.sql.shuffle.partitions", 500)

另外,我可以设置 spark.default.parallelism ,其执行行为文档说明:
当用户未设置时,join、reducebykey和parallelize等转换返回的RDD中的默认分区数。
示例(其中 spark 是一个工作 SparkSession ):

spark.conf.set("spark.default.parallelism", 500)

相关问题