我有一个任务,在spark中有一个rdd,记录如下:
[(id, group), {'token1', 'token2'...}]
例如,''tokenizedtweetsbyuser.take(5)''提供:
[(('470520068', 3), {'#berniesanders', '#goldmansachs', '$', '.', '/', '4', 'a', 'adorned', 'bc', 'capitalist', 'class', "doesn't", 'he', "i'm", 'pig', 'ride', 'rigged', 'system', 'voting', 'w', 'war'}), (('2176120173', 6), {'!', '#america', '#trump', '#votetrump', '&', '.', ':', ';', '@realdonaldtrump', '@trumpnewmedia', 'amp', 'change', "don't", 'get', 'htt', 'if', "it's", 'nothing', 'out', 'rt', 'simple', 'that', 'will', 'you', '…'}), (('145087572', 3), {'!', '#colorado', '#denver', '%', ',', '-', '.', '1', '11am', '1pm', ':', '@allonmedicare', '@berniesanders', '@libertea2012', '@rockportbasset', 'america', 'and', 'capitol', 'co', 'endorse', 'for', 'herself', 'hillary', 'http', 'icymi', 'in', 'is', 'leading', 'liar', 'mst', 'only', 'out', 'positive', 'progressive', 'proof', 'rt', 's', 'state', 'that', 'the', 'to', 'today', 'voices', 'wake-up', 'weasel', '’', '…'}), (('23047147', 6), {'@madworldnews', '[', ']', 'after', 'bernie', 'deal', 'fans', 'had', 'liberal', 'pour', 'supporter', 'tears', 'to', 'trump', 'via', 'vid', 'with'}), (('526506000', 4), {'.', ':', '@justinamash', '@tedcruz', 'calls', 'candidate', 'cartel', 'correctly', 'he', 'i', 'is', 'on', 'only', 'remaining', 'rt', 'take', 'the', 'to', 'trust', 'washington', 'what', '…'})]
这些令牌来自tweets和前100个令牌的列表,我需要计算每个组找到多少令牌。共有8组。
我的实现非常简单:
tokenizedTweetsByUser.cache()
groupCounts = []
for i in range(8):
groupCounts.append([])
for token in tokensList:
#the following statement take too long!
item_count = tokenizedTweetsByUser.filter(lambda x: (x[0][1] == i) and (token in x[1])).count()
if item_count > 0:
groupCounts[i].append((token, item_count))
但这太耗时了。我知道filter.count将运行800次,但由于它只是一个filter count,我们正在寻找一个集合中的令牌,因此我希望它的性能相当好。
有人能提出另一种更有效的方法吗?
1条答案
按热度按时间rpppsulh1#
似乎reducebykey比在filter.count的许多示例上循环要高效得多。
在本例中,将组和列表项组合成一个字符串,该字符串将作为键,以便每个列表项都是一个单独的行。然后执行reducebykey:
速度快了两倍。