pyspark rdd过滤器副本

qpgpyjmq  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(468)

我有这个rdd

rdd = sc.parallelize(['101,2020-09-04', 30.4, 0.0, 0.0], ['101,2020-09-04', 29.5, 45, 0.0], ['102,2020-09-04', 45.2, 48, 0.0])

我是python和pyspark的新手,我只想用rdd方法解决这个问题,我知道用dataframe方法解决这个问题,但只需要用rdd方法。
我试图过滤重复的基础上计数的0.0出现在单元组/行。表示第一个元组“1012020-09-04”有2个0.0倍,第二个元组只有1个0.0倍,所以我想在这里选取第二个元组。
我的预期产出是:

[['101,2020-09-04', 29.5, 45, 0.0], ['102,2020-09-04', 45.2, 48, 0.0]]

也不希望每个元组都计数0.0,仅当id重复时才计数。

gcxthw6b

gcxthw6b1#

让我们来解决这个问题。


# First we set up the problem

data = [['101,2020-09-04', 30.4, 0.0, 0.0], ['101,2020-09-04', 29.5, 45, 0.0], ['102,2020-09-04', 45.2, 48, 0.0]]
rdd = sc.parallelize(data)

# Break RDD rows into pairs: the common part to serve as our key, and the rest of the list

pair_rdd = rdd.map(lambda x: (x[0], x[1:]))
pair_rdd.collect()

# [('101,2020-09-04', [30.4, 0.0, 0.0]), ('101,2020-09-04', [29.5, 45, 0.0]), ('102,2020-09-04', [45.2, 48, 0.0])]

既然公共部分分开了,我们就可以用它来 reduceByKey ,但首先我们需要一个函数来比较列表值。


# Comparing lists e.g. [1,0] > [0,1] -> True, because it compares elements

# left to right. We reverse the lists in the function, so that the

# zeros are on the left most side

# [0.0, 0.0, 30.4] < [0.0, 45, 29.5] -> True

def get_most_complete(record1, record2):
  if (record1[::-1] > record2[::-1]):
    return record1
  else:
    return record2

# Now we have our function and can reduceByKey

reduced_pair_rdd = pair_rdd.reduceByKey(lambda x,y: get_most_complete(x,y))

# Then we dismantle the pair into its original form

result = reduced_pair_rdd.map(lambda x: [x[0]] + x[1])
result.collect()

# [['101,2020-09-04', 29.5, 45, 0.0], ['102,2020-09-04', 45.2, 48, 0.0]]

这就是你想要的结果。

相关问题