我正在阅读许多图片,我想在其中的一小部分的发展工作。因此,我试图理解spark和python是如何做到这一点的:
In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'
In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43
In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...
…那么发生了什么?我希望limit()的运行速度比我们在 [2]
,但事实并非如此。
下面我将描述我的理解,请纠正我,因为很明显我遗漏了一些东西: d
是成对的rdd(我从模式中知道这一点),我用map函数说:
i) 把每一对都拿走(它们将被命名为 x
把钱还给我 photo_id
属性)。
ii)这将导致一个新的(匿名)rdd,我们在其中应用 first()
方法,我不确定它是如何工作的$,但应该给我匿名rdd的第一个元素。
在 [3]
,我们限制 d
rdd为1,这意味着 d
有许多元素,仅使用1,并将map函数仅应用于该元素。这个 Out [3]
应该是由Map创建的rdd。
在 [4]
,我希望遵循 [3]
只需打印有限rdd的唯一元素。。。
正如预期的那样,在查看了监视器之后,[4]似乎处理了整个数据集,而其他数据集则没有,因此我似乎没有使用 limit()
对,或者那不是我想要的:
编辑:
tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()
第一个将给出一个 PipelinedRDD
,正如这里所描述的,它实际上不会做任何动作,只是一个转换。
但是,第二行还将处理整个数据集(事实上,现在的任务数和以前一样多,加上一个!)。
- [2] 在[4]仍在运行且超过3小时后立即执行。。
$因为名字的关系,我在文档中找不到它。
1条答案
按热度按时间rseugnpd1#
根据您的代码,下面是spark 2.0上更简单的测试用例
实际上,dataset.first相当于dataset.limit(1).collect,所以检查这两种情况的物理计划:
对于第一种情况,它与collectlimitexec物理运算符中的优化有关。也就是说,它将首先获取第一个分区以获取限制行数,在本例中为1,如果不满足,则获取更多分区,直到达到所需的限制为止。所以一般来说,如果第一个分区不是空的,那么只计算和获取第一个分区。其他分区甚至不会被计算。
但是,在第二种情况下,collectlimitexec中的优化没有帮助,因为前面的限制操作涉及一个shuffle操作。将计算所有分区,并对每个分区运行locallimit(1)以获得一行,然后将所有分区洗牌到单个分区中。collectlimitexec将从生成的单个分区中获取1行。