pyspark Spark'极限'不并行运行?

b91juud3  于 2023-11-16  发布在  Spark
关注(0)|答案(3)|浏览(148)

我有一个简单的join,我在其中限制了两侧。在explain计划中,我看到在执行limit之前有一个ExchangeSingle操作,实际上我看到在这个阶段集群中只有一个任务在运行。
这当然会极大地影响性能(删除限制会消除单个任务的瓶颈,但会延长连接,因为它适用于更大的数据集)。
极限真的不能并行化吗?如果是的话,有没有解决办法?
我在Databricks集群上使用spark。
编辑:关于可能的重复。答案并没有解释为什么所有的东西都被混洗到一个单独的分区中。还有-我询问了解决这个问题的建议。

velaa5lx

velaa5lx1#

根据用户8371915在评论中给出的建议,我使用了sample而不是limit。它打开了瓶颈。
一个小而重要的细节:在sample之后,我仍然必须对结果集设置一个可预测的大小约束,但是sample输入的是一个分数,因此结果集的大小可以很大程度上取决于输入的大小。
对我来说幸运的是,使用count()运行相同的查询非常快,所以我首先计算了整个结果集的大小,并使用它来计算我后来在样本中使用的分数。

dddzy1tm

dddzy1tm2#

限制后并行化的解决方法:.repartition(200)
这将再次重新分配数据,以便您可以并行工作。

2skhul33

2skhul333#

回复

Spark的极限并不平行。

原因

Spark中有一些物理操作符用于限制逻辑:

  1. CollectLimitExec:
  • 将数据收集到单个分区,不并行工作,而是“增量执行限制”。
  • 仅当逻辑限制操作是逻辑计划中的最后一个操作符时使用,这在用户将结果收集回驱动程序时发生。例如:spark.sql("select * from x limit 100").collect()
  1. LocalLimitExec和GlobalLimitExec
  • 他们一起工作
    LocalLimitExec:取每个子分区的第一个limit元素,但不进行收集和 Shuffle 。
    GlobalLimitExec:取子级单个输出分区的第一个limit元素。

他们之间有一个交换( Shuffle )。

  • 全局限制步骤,在单个分区中工作,而不是并行

1.其他Exec:CollectTailExec / TakeOrderedAndProjectExec

解决方案

随机

SELECT * FROM test TABLESAMPLE (50 PERCENT)

select * from x where rand() < 0.01

df.sample(0.01)  or  rdd.sample(0.01)

字符串

mapPartitions和take

直接服用

df.mapPartitions((a)=>a.take(2853557))


在countByPartitions之后取

//  ①  统计每个分区内行数 / countByPartitions
    val x = df5.mapPartitions((a) => {
      val pid = TaskContext.getPartitionId()
      Iterator((pid, a.size))
    })
    val countByPart = x.collectAsList()
    print(countByPart) //[(0,400), (1,400), (2,400), (3,400), (4,400)]

   //  ②  分配各分区应该take的数量. / allocate
    var limit = 900
    val takeByPart = new Array[Int](countByPart.size)
    for (a <- 0 until countByPart.size) {
      val take = if (limit > 0) {
        Math.min(limit, countByPart.get(a)._2)
      } else {
        0
      }
      limit = limit - take
      takeByPart(a) = take
    }
    print(takeByPart.mkString("(", ", ", ")")) //(400, 400, 100, 0, 0)
    val takeByPartBC = spark.sparkContext.broadcast(takeByPart)

   //  ③  分区take结果
    val result = df5.mapPartitions((a) => {
      val pid = TaskContext.getPartitionId()
      val take = takeByPartBC.value(pid)
      a.take(take)
    })
    assert(result.count() == 900)


转到my blog for further reading

相关问题