如何在spark中创建分区

kh212irz  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(507)

我正在寻找有关如何在spark中创建分区的详细描述。我假设它是根据集群中可用内核的数量创建的。但举个例子,如果我有512MB的文件需要处理,这个文件将以64MB或128MB的块大小存储在我的存储器(可以是hdfs或s3存储桶)中。对于这种情况,我们可以假设我的集群核心是8。但是,当spark程序处理文件时,分区将如何在这个文件上发生。希望512MB的文件将被分成8个不同的分区,并在8个核心中执行。请对此提出建议。

wd2eg0qa

wd2eg0qa1#

我在filepartition.scala的源代码中找到了一些东西。分区的数量似乎与配置参数“maxsplitbytes”和“fileopencostinbytes”有关

def getFilePartitions(
  sparkSession: SparkSession,
  partitionedFiles: Seq[PartitionedFile],
  maxSplitBytes: Long): Seq[FilePartition] = {
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/**Close the current partition and move to the next. */
def closePartition(): Unit = {
  if (currentFiles.nonEmpty) {
    // Copy to a new Array.
    val newPartition = FilePartition(partitions.size, currentFiles.toArray)
    partitions += newPartition
  }
  currentFiles.clear()
  currentSize = 0
}

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// Assign files to partitions using "Next Fit Decreasing"
partitionedFiles.foreach { file =>
  if (currentSize + file.length > maxSplitBytes) {
    closePartition()
  }
  // Add the given file to the current partition.
  currentSize += file.length + openCostInBytes
  currentFiles += file
}
closePartition()
partitions.toSeq

}

相关问题