过滤掉pyspark rdd中的非数字值

qojgxg4l  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(609)

我有一个rdd,看起来像这样:

[["3331/587","Metro","1235","1000"],
["1234/232","City","8479","2000"],
["5987/215","Metro","1111","Unkown"],
["8794/215","Metro","1112","1000"],
["1254/951","City","6598","XXXX"],
["1584/951","City","1548","Unkown"],
["1833/331","Metro","1009","2000"],
["2213/987","City","1197", ]]

我想分别计算第二个条目(city/metro)中每个不同值的每一行最后一个值(1000、2000等)的平均值和最大值。我使用以下代码来收集“city”值:

rdd.filter(lambda row: row[1] == 'City').map(lambda x: float(x[3])).collect()

但是,我得到了错误,可能是因为序列中的字符串值(“unknown”例如)。
如何筛选出具有字符串和空值的行(=仅保留可转换为数字的行),然后计算最大值和平均值?

7nbnzgx9

7nbnzgx91#

试试这个。

rdd = rdd.map(lambda l: [l[i].replace('"', '') for i in range(0, len(l))])
rdd = rdd.filter(lambda l: len(l) > 3) \
   .filter(lambda l: l[1] in ['City', 'Metro']) \
   .filter(lambda l: l[3].isdigit()) \
   .map(lambda l: (l[1], int(l[3]))) \

rdd_avg = rdd.aggregateByKey((0, 0), lambda a, b: (a[0] + b, a[1] + 1), lambda a, b: a + b).mapValues(lambda x: x[0] / x[1])
rdd_max = rdd.reduceByKey(lambda a, b: a if a > b else b)

print(rdd_avg.collect())
print(rdd_max.collect())

[('Metro', 1333.3333333333333), ('City', 2000.0)]
[('Metro', 2000), ('City', 2000)]

相关问题