从PySpark RDD中删除重复的元组对

5m1hhzi4  于 9个月前  发布在  Spark
关注(0)|答案(2)|浏览(124)

我给出了一个rdd。示例:test = sc.parallelize([(1,0),(2,0),(3,0)])
我需要得到笛卡尔积并删除结果中有重复条目的元组对。在这个玩具示例中,这些元组对是((1,0),(1,0)),((2,0),(2,0)),((3,0),(3,0))。
我可以得到笛卡尔积如下:* 注意 * collect和print语句只用于故障排除。

def compute_cartesian(rdd):
    result1 = sc.parallelize(sorted(rdd.cartesian(rdd).collect()))
    print(type(result1))
    print(result1.collect())

字符串
我在这个阶段的类型和输出是正确的:

<class 'pyspark.rdd.RDD'>
[((1, 0), (1, 0)), ((1, 0), (2, 0)), ((1, 0), (3, 0)), ((2, 0), (1, 0)), ((2, 0), (2, 0)), ((2, 0), (3, 0)), ((3, 0), (1, 0)), ((3, 0), (2, 0)), ((3, 0), (3, 0))]


但是现在我需要删除三对具有重复条目的元组。
尝试到目前为止:
1..distinct()这会运行,但不会产生正确的结果rdd。
1..dropDuplicates()不会运行。我假设这是.dropDuplicates()的错误用法。
1.手动功能:
如果没有RDD,这个任务很容易。

# Remove duplicates
for elem in result:
    if elem[0] == elem[1]:
        result.remove(elem)
print(result)
print("After: ", len(result))


这是我写的一个函数,它删除重复的元组对,然后吐出结果len,这样我就可以做一个健全性检查。
我只是不确定如何直接在RDD上执行操作,在本例中,删除笛卡尔积产生的任何重复元组对,并返回RDD。
是的,我可以.collect()它,执行操作,然后将其重新输入为RDD,但这违背了目的。假设这是数十亿对。我需要对RDD执行操作并返回一个RDD。

b1zrtrql

b1zrtrql1#

您可以使用filter删除您不需要的对:

dd.cartesian(rdd).filter(lambda x: x[0] != x[1])

字符串
请注意,我不会称这些对为“重复对”,而是“重复对”,或者更好的是“对角对”:如果您将笛卡尔积几何可视化,则它们对应于对角线。
这就是为什么distinctdropDuplicates在这里不合适:它们删除了重复项,这不是您想要的。例如,[1,1,2].distinct()[1,2]

ahy6op9u

ahy6op9u2#

pairs = rdd.flatMap(lambda x:  [(x[0],y) for y in x[1:]])

字符串

相关问题