Apache Spark - shuffle写入的数据超过输入数据的大小

o8x7eapl  于 2023-05-01  发布在  Apache
关注(0)|答案(1)|浏览(218)

我使用Spark 2。1在本地模式下运行这个简单的应用程序。

val N = 10 << 20

sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")

df1.join(df2, col("k1") === col("k2")).count()

这里,range(N)创建了一个 Long 的数据集(具有唯一值),因此我假设

  • df 1 = N * 8字节~ 80 MB
  • df 2 = N / 5 * 8字节~ 16 MB

现在让我们以df 1为例。df 1由8个分区组成shuffledRDD为5,因此我假设

  • Map器数量(M)= 8
  • 减速器数量(R)= 5

由于分区数很低,Spark将使用Hash Shuffle,这将在磁盘中创建M * R个文件,但我不明白是否每个文件都有所有数据,因此each_file_size = data_size导致M * R * data_size文件或all_files = data_size
然而,当执行此应用程序时,shuffle写入df 1 = 160 MB,这与上述任何一种情况都不匹配。
Spark UI
我错过了什么?为什么混洗写数据的大小翻了一番?

iq0todco

iq0todco1#

首先,让我们看看data size total(min, med, max)是什么意思:
根据SQLMetrics。scala#L88和ShuffleExchange。scala#L43,我们看到的data size total(min, med, max)是shuffle的dataSize度量的最终值。那么,它是如何更新的呢?每次序列化记录时都会更新它:UnsafeRowSerializerscala#L66通过dataSize.add(row.getSizeInBytes)UnsafeRow是Spark SQL中记录的内部表示)。
在内部,UnsafeRowbyte[]支持,并在序列化期间直接复制到底层输出流,其getSizeInBytes()方法仅返回byte[]的长度。因此,初始问题转化为:为什么字节表示是记录中唯一的long列的两倍大?这个不安全的。scala doc给了我们答案:
每个元组有三个部分:[空位集] [值] [可变长度部分]
该位集用于空值跟踪,并与8字节字边界对齐。每个字段存储一位。
因为它是8字节字对齐的,所以唯一的1个空位占用另一个8字节,与长列相同的宽度。因此,每个UnsafeRow表示使用16个字节的一长列行。

相关问题