限制大型rdd

9jyewag0  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(332)

我正在阅读许多图片,我想在其中的一小部分的发展工作。因此,我试图理解spark和python是如何做到这一点的:

In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'

In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43

In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...

…那么发生了什么?我希望limit()的运行速度比我们在 [2] ,但事实并非如此。
下面我将描述我的理解,请纠正我,因为很明显我遗漏了一些东西: d 是成对的rdd(我从模式中知道这一点),我用map函数说:
i) 把每一对都拿走(它们将被命名为 x 把钱还给我 photo_id 属性)。
ii)这将导致一个新的(匿名)rdd,我们在其中应用 first() 方法,我不确定它是如何工作的$,但应该给我匿名rdd的第一个元素。
[3] ,我们限制 d rdd为1,这意味着 d 有许多元素,仅使用1,并将map函数仅应用于该元素。这个 Out [3] 应该是由Map创建的rdd。
[4] ,我希望遵循 [3] 只需打印有限rdd的唯一元素。。。
正如预期的那样,在查看了监视器之后,[4]似乎处理了整个数据集,而其他数据集则没有,因此我似乎没有使用 limit() 对,或者那不是我想要的:

编辑:

tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()

第一个将给出一个 PipelinedRDD ,正如这里所描述的,它实际上不会做任何动作,只是一个转换。
但是,第二行还将处理整个数据集(事实上,现在的任务数和以前一样多,加上一个!)。

  • [2] 在[4]仍在运行且超过3小时后立即执行。。

$因为名字的关系,我在文档中找不到它。

rseugnpd

rseugnpd1#

根据您的代码,下面是spark 2.0上更简单的测试用例

case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line

实际上,dataset.first相当于dataset.limit(1).collect,所以检查这两种情况的物理计划:

scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
   +- *MapElements <function1>, obj#123: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
         +- Scan ExistingRDD[x#74]

scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
   +- *MapElements <function1>, obj#130: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
         +- *GlobalLimit 1
            +- Exchange SinglePartition
               +- *LocalLimit 1
                  +- Scan ExistingRDD[x#74]

对于第一种情况,它与collectlimitexec物理运算符中的优化有关。也就是说,它将首先获取第一个分区以获取限制行数,在本例中为1,如果不满足,则获取更多分区,直到达到所需的限制为止。所以一般来说,如果第一个分区不是空的,那么只计算和获取第一个分区。其他分区甚至不会被计算。
但是,在第二种情况下,collectlimitexec中的优化没有帮助,因为前面的限制操作涉及一个shuffle操作。将计算所有分区,并对每个分区运行locallimit(1)以获得一行,然后将所有分区洗牌到单个分区中。collectlimitexec将从生成的单个分区中获取1行。

相关问题