scala 使用Spark Distinct

u0njafvf  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(113)

我是Spark和Scala的新手。我正在阅读关于Spark的distinct()函数。但我找不到任何合适的细节。我有几个疑问,我不能解决,并已写下来。
1.如何在Spark中实现distinct()?
我对Spark源代码不太在行,无法识别整个流程。当我检查执行计划时,我只能看到ShuffleRDD

  1. distinct的时间复杂度是多少?
    我还从谷歌搜索中发现,它也以某种方式使用哈希和排序。
    所以,我想它是否使用了与Hashset帮助下从数组中获取唯一元素相同的原理。如果是一个系统,我会猜测时间复杂度是O(nlogn)。
    但它是分布在许多分区和 Shuffle ,将是什么顺序的时间复杂度?
    1.有没有办法避免在特定情况下 Shuffle ?
    如果我确保根据我的用例正确分区我的数据,我可以避免 Shuffle 吗?
    也就是说,比如说,用唯一的行分解一个ArrayType列会创建新的行,而其他列会被复制。我将选择其他栏目。通过这种方式,我确保每个分区的副本是唯一的。因为我知道每个分区的重复是唯一的,所以我可以避免 Shuffle ,而只是敏锐地删除该分区中的重复
    我还找到了这个Does spark's distinct() function shuffle only the distinct tuples from each partition
    谢谢你的帮助。如果我哪里错了,请纠正我。
py49o6xq

py49o6xq1#

如何在Spark中实现distinct()?
通过应用具有None值的虚拟聚合。大致

rdd.map((_, None)).reduceByKey((a, b) => a)

distinct的时间复杂度是多少?
鉴于整个过程的复杂性,很难估计。它至少是O(N log N),因为shuffle需要排序,但是考虑到构建额外的非核心数据结构(包括关联数组)所需的多个其他操作,串行化/并行化数据可能会更高,并且在实践中由IO操作主导,而不是纯粹的算法复杂度。
有没有办法避免在特定情况下 Shuffle ?
是的,如果保证将潜在的重复放在同一个分区上。
您可以使用mapPartitions来消除重复数据,特别是如果数据已排序或以其他方式保证在孤立的邻域中有重复数据。如果没有这个,你可能会受到内存需求的限制,除非你接受概率过滤器(如布隆过滤器)的近似结果。
一般来说,这是不可能的,像这样的操作将是非本地的。

相关问题