我使用Spark 2。1在本地模式下运行这个简单的应用程序。
val N = 10 << 20
sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")
df1.join(df2, col("k1") === col("k2")).count()
这里,range(N)创建了一个 Long 的数据集(具有唯一值),因此我假设
- df 1 = N * 8字节~ 80 MB
- df 2 = N / 5 * 8字节~ 16 MB
现在让我们以df 1为例。df 1由8个分区组成,shuffledRDD为5,因此我假设
- Map器数量(M)= 8
- 减速器数量(R)= 5
由于分区数很低,Spark将使用Hash Shuffle,这将在磁盘中创建M * R个文件,但我不明白是否每个文件都有所有数据,因此each_file_size = data_size导致M * R * data_size文件或all_files = data_size。
然而,当执行此应用程序时,shuffle写入df 1 = 160 MB,这与上述任何一种情况都不匹配。
Spark UI
我错过了什么?为什么混洗写数据的大小翻了一番?
1条答案
按热度按时间iq0todco1#
首先,让我们看看
data size total(min, med, max)
是什么意思:根据SQLMetrics。scala#L88和ShuffleExchange。scala#L43,我们看到的
data size total(min, med, max)
是shuffle的dataSize
度量的最终值。那么,它是如何更新的呢?每次序列化记录时都会更新它:UnsafeRowSerializerscala#L66通过dataSize.add(row.getSizeInBytes)
(UnsafeRow
是Spark SQL中记录的内部表示)。在内部,
UnsafeRow
由byte[]
支持,并在序列化期间直接复制到底层输出流,其getSizeInBytes()
方法仅返回byte[]
的长度。因此,初始问题转化为:为什么字节表示是记录中唯一的long
列的两倍大?这个不安全的。scala doc给了我们答案:每个元组有三个部分:[空位集] [值] [可变长度部分]
该位集用于空值跟踪,并与8字节字边界对齐。每个字段存储一位。
因为它是8字节字对齐的,所以唯一的1个空位占用另一个8字节,与长列相同的宽度。因此,每个
UnsafeRow
表示使用16个字节的一长列行。