SELECT * FROM test TABLESAMPLE (50 PERCENT)
select * from x where rand() < 0.01
df.sample(0.01) or rdd.sample(0.01)
字符串
mapPartitions和take
直接服用
df.mapPartitions((a)=>a.take(2853557))
型 在countByPartitions之后取
// ① 统计每个分区内行数 / countByPartitions
val x = df5.mapPartitions((a) => {
val pid = TaskContext.getPartitionId()
Iterator((pid, a.size))
})
val countByPart = x.collectAsList()
print(countByPart) //[(0,400), (1,400), (2,400), (3,400), (4,400)]
// ② 分配各分区应该take的数量. / allocate
var limit = 900
val takeByPart = new Array[Int](countByPart.size)
for (a <- 0 until countByPart.size) {
val take = if (limit > 0) {
Math.min(limit, countByPart.get(a)._2)
} else {
0
}
limit = limit - take
takeByPart(a) = take
}
print(takeByPart.mkString("(", ", ", ")")) //(400, 400, 100, 0, 0)
val takeByPartBC = spark.sparkContext.broadcast(takeByPart)
// ③ 分区take结果
val result = df5.mapPartitions((a) => {
val pid = TaskContext.getPartitionId()
val take = takeByPartBC.value(pid)
a.take(take)
})
assert(result.count() == 900)
3条答案
按热度按时间velaa5lx1#
根据用户8371915在评论中给出的建议,我使用了sample而不是limit。它打开了瓶颈。
一个小而重要的细节:在sample之后,我仍然必须对结果集设置一个可预测的大小约束,但是sample输入的是一个分数,因此结果集的大小可以很大程度上取决于输入的大小。
对我来说幸运的是,使用count()运行相同的查询非常快,所以我首先计算了整个结果集的大小,并使用它来计算我后来在样本中使用的分数。
dddzy1tm2#
限制后并行化的解决方法:.repartition(200)
这将再次重新分配数据,以便您可以并行工作。
2skhul333#
回复
Spark的极限并不平行。
原因
Spark中有一些物理操作符用于限制逻辑:
spark.sql("select * from x limit 100").collect()
LocalLimitExec:取每个子分区的第一个limit元素,但不进行收集和 Shuffle 。
GlobalLimitExec:取子级单个输出分区的第一个limit元素。
他们之间有一个交换( Shuffle )。
1.其他Exec:CollectTailExec / TakeOrderedAndProjectExec
解决方案
随机
字符串
mapPartitions和take
直接服用
型
在countByPartitions之后取
型
转到my blog for further reading