scala—rdd[(k,v)]上的groupbykey的类型返回list[(k,rdd[v])]

gcmastyq  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(259)

我愿意分一杯羹 RDD[(K, V)] 到bucket中,例如输出类型将是 List[(K, RDD[V])] ,这是我的建议。但我不满足,因为它依赖于 keysNumber 检查原始rdd。它是否存在其他方式来处理需要较少运行在原始rdd上的数据。如果不是,你怎么看在递归调用之前放入缓存rest的事实呢?当然会更快,但是doest spark会因为与第一个rdd的沿袭而最小化内存中的存储量,还是会节省内存 ~keysNumber 乘以原始rdd的最小版本。谢谢您。

def kindOfGroupByKey[K : ClassTag, V : ClassTag](rdd: RDD[(K, V)], keys: List[K] = List.empty[K]): List[(K, RDD[V])] = {

    val keysIn: List[K] = if (keys.isEmpty) rdd.map(_._1).distinct.collect.toList else keys

    @annotation.tailrec
    def go(rdd2: RDD[(K, V)], keys: List[K], output: List[(K, RDD[V])]): List[(K, RDD[V])] = {

        val currentKey :: keyxs = keys

        val filtered = rdd2.filter(_._1 == currentKey)
        val rest = rdd2.filter(_._1 != currentKey)

        val updatedOutput = (currentKey, filtered.map(_._2)) :: output

        if (keys.isEmpty) updatedOutput.reverse
        // Supposing rdd is cached, it is good to cache rest or does it will generate many smallest cached version of rdd which risk to overload ram ?
        else go(rest, keyxs, updatedOutput)

    }

    go(rdd, keysIn, List.empty[(K, RDD[V])])

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题