Apache Spark csv如何确定读取时的分区数?

q5iwbnjs  于 2023-04-12  发布在  Apache
关注(0)|答案(4)|浏览(246)

在Spark 2.2.0中:我正在阅读一个文件

spark.csv.read("filepath").load().rdd.getNumPartitions

在一个系统中,一个350 MB的文件有77个分区,在另一个系统中有88个分区。对于一个28 GB的文件,我还得到了226个分区,大约是28*1024 MB/128 MB。问题是,Spark CSV数据源如何确定这个默认的分区数量?

aurhwmvo

aurhwmvo1#

分区的数量受多种因素的影响-通常

  • spark.default.parallelism
  • 正在阅读的文件数(如果从目录中读取文件)
  • 影响spark.default.parallelism的群集管理器/内核数量(请参见spark configuration

从文本文件(以及CSV)阅读时的分区数应根据CSVDataSource确定为math.min(defaultParallelism, 2)

um6iljoc

um6iljoc2#

当阅读csv文件(单个大文件或多个小文件,压缩或未压缩)时,我发现spark.sql.files.maxPartitionBytes对结果分区的数量有很大的影响。调整这个值(默认值为128MB,请参阅https://spark.apache.org/docs/latest/sql-performance-tuning.html)对我来说很关键。

aiazj4mn

aiazj4mn3#

从任何文件阅读时的分区数遵循以下公式。
step1:找到文件大小/文件夹大小从指定的路径,我在本地测试.你可以找到根据您的要求(无论是s3/hdfs).

import os
def find_folder_size(path):
    total = 0
    for entry in os.scandir(path):
        if entry.is_file():
            total += entry.stat().st_size
        elif entry.is_dir():
            total += find_folder_size(entry.path)
    return total

步骤2:应用公式

target_partition_size = 200  #100 or 200 depends on your target partition
total_size = find_folder_size(paths)
print('Total size: {}'.format(total_size))
print(int(math.ceil(total_size / 1024.0 / 1024.0 / float(target_partition_size))))
num_partitions = int(math.ceil(total_size / 1024.0 / 1024.0 / float(target_partition_size)))
PARTITION_COLUMN_NAME = ['a','c']
df = df.repartition(num_partitions, PARTITION_COLUMN_NAME)
or 
df = df.repartition(num_partitions)

我们可以申请大数据/小数据来获得分区数。

bttbmeg0

bttbmeg04#

根据我的经验,这取决于spark.default.parallelism

**场景一:**文件大小:75MB默认并行度:8

>>> sc.defaultParallelism
8
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
8

场景:2文件大小:75MB默认并行度:10

>>> sc.defaultParallelism
10
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
10

场景三文件大小:75MB默认并行度:4

>>> sc.defaultParallelism
4
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
4

**场景四:**文件大小:75MB默认并行度:100

>>> sc.defaultParallelism
100
>>> booksDF = spark.read.option("inferSchema","true").option("header","true").csv("file:///C:\\Users\\Sandeep\\Desktop\\data\\Turkish_Book_Dataset_Kaggle_V2.csv")
>>> booksDF.rdd.getNumPartitions()
18
  • 在场景4中,它将数据划分为可能数量的分区,即18*

基于此,我推断,初始数量取决于spark.default.parallelism的值。
如果spark.default.parallelism设置为更高的数字,它只会根据哈希创建可能数量的分区。

相关问题