我用spark写了一个自定义的RDD提供程序,并在SQL查询下测试了它的执行时间。我不确定这个提供程序的内部机制与这个问题有关,但基本上它用于从本地和远程文件读取数据。
问题:使用distinct
关键字运行查询(无论是否用于聚合查询)对查询的性能有很大的影响。
范例:
查询:select sourceip, sourceport, destinationport from table where destinationport=80
执行时间:31527 ms
查询:select distinct sourceip, sourceport, destinationport from events where destinationport=80
执行时间:57856 ms
这个结果是针对非常少量的数据,当我尝试在中等大小的数据集上运行它时,Spark崩溃了“太多打开的文件”。
日志:
/tmp/spark-a47 f859 b-2a 1f-4466-8333- 0 bf 40 c3968 eb/executor-9 c98264 a-23 a8 - 49 b8-ab 6 f-ddf 7349 e0155/blockmgr-b73 fc 639 -8705-4956-8652-e7300 b35527 a/3f/temp_shuffle_b4afe57f-9db 1 -4653- 91 c 0 - 22 d207933748(打开的文件太多)
从长远来看,使用distinct
会用5分钟的数据压缩集群,而不使用distinct
,服务器可以成功运行2天的查询。
任何想法可能导致这个问题吗?
P.S.我验证了打开文件的数量及其限制使用:
打开文件:lsof | wc -l
导致约1.4M
最大打开文件:cat /proc/sys/fs/file-max
导致9- 42 M(取决于机器-主有9 M)
2条答案
按热度按时间6qftjkof1#
以下两个步骤可以帮助您调试问题:
1)distinct()肯定是在分区之间进行 Shuffle 。要了解更多情况,请在RDD?上运行. toString。
2)你能检查你的工作是正确的并行使用Spark UI检查?
同时检查你的分区是否倾斜,here is link for further reading
zpgglvta2#
当然,如果你的查询中有不同的内容,你的查询速度会变慢。
这里是一个简单的选择过滤查询。所以它不需要任何shuffle。Spark将执行 predicate 下推并仅使用过滤操作给予结果
而
这里你有distinct.它本质上要求spark做shuffle。它首先过滤出结果,然后计算每个结果的哈希值,然后进行reduce操作以删除重复的结果(相同的哈希多行归结为一行)
现在shuffle是一个相对昂贵的操作,因为它需要通过网络发送数据,因此第二个查询将比前一个查询慢得多