使用PySpark统计10个最常用的单词

daupos2t  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(136)

我想写一个PySpark代码片段,首先从云存储桶中读取一些文本文件形式的数据。文本文件包含由换行符分隔的文本段落,单词也使用空格字符分隔。
我需要计算给定文本文件中出现频率最高的10个单词。

import pyspark
from pyspark import SparkConf, SparkContext
from google.cloud import storage
import sys
conf = SparkConf().setMaster("local").setAppName("some paragraph")
sc = SparkContext(conf=conf)

bucket_name = (sys.argv[1])
destination_blob_name = (sys.argv[2])

storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
 
downloaded_blob = blob.download_as_string()
print(downloaded_blob)

print(blob)
def words():
    alist = {}
    for line in blob:
        fields = line.split("|")
        print(fields)
        movies[int(fields[0])]=fields[1]
    return movies

myline = words()
myRDD = sc.parallelize([myline])

print(myRDD.collect())
2g32fytz

2g32fytz1#

如果您有这样的文件:

s = """a b c d e f g h i j
a a b c d e f g h i j k l m n o p"""
with open('file.txt', 'w') as f:
    f.write(s)

你可以把前10个单词放入Python字典中,如下所示:

from pyspark.sql import functions as F

split_on_spaces = F.split('value', ' ')
df = (
    spark.read.text('file.txt')
    .withColumn('value', F.explode(split_on_spaces))
    .groupBy('value').count()
    .orderBy(F.desc('count'))
)
top_val_dict = {r['value']:r['count'] for r in df.head(10)}

print(top_val_dict)
# {'a': 3, 'g': 2, 'e': 2, 'f': 2, 'i': 2, 'h': 2, 'd': 2, 'c': 2, 'j': 2, 'b': 2}

这只是假设你描述的单词用空格分隔的情况。在真实的世界中,你可能需要处理标点符号,删除标点符号后可能删除非单词等。这取决于你进一步调整算法。

bfrts1fy

bfrts1fy2#

如果要使用RDD转换,可以使用collections.Counter()创建频率,然后根据计数对其进行排序。
下面是一个获取前3名示例

from collections import Counter

data_rdd = spark.sparkContext.textFile('file.txt')

data_rdd.collect()
# ['a b c b a d h t t a w b c c c']

data_rdd. \
    map(lambda x: x.split(' ')). \
    map(lambda e: sorted(Counter(e).items(), key=lambda k: k[1], reverse=True)). \
    flatMap(lambda e: e[:3]). \
    collect()
# [('c', 4), ('a', 3), ('b', 3)]

一个真实的世界的例子,其中3个段落由\n分隔,单词由``分隔。

txt = """foo bar bar baz bar\nbaz baz foo foo baz\nunfoo unbaz unbar"""

with open('file.txt', 'w') as f:
    f.write(txt)

# read file as text file
data_rdd = spark.sparkContext.textFile('file.txt')
# spark automatically splits paragraphs
# ['foo bar bar baz bar', 'baz baz foo foo baz', 'unfoo unbaz unbar']

from collections import Counter

# get top 3 words in the text file
data_rdd. \
    map(lambda x: x.split(' ')). \
    flatMap(lambda x: Counter(x).items()). \
    reduceByKey(lambda c1, c2: c1 + c2). \
    top(3, key=lambda k: k[1])
# [('baz', 4), ('foo', 3), ('bar', 3)]

如果你想要所有的单词沿着它们的频率,你可以根据频率对单词进行排序,然后简单地收集

data_rdd. \
    map(lambda x: x.split(' ')). \
    flatMap(lambda x: Counter(x).items()). \
    reduceByKey(lambda c1, c2: c1 + c2). \
    sortBy(lambda x: x[1], False). \
    collect()

# [('baz', 4), ('foo', 3), ('bar', 3), ('unfoo', 1), ('unbar', 1), ('unbaz', 1)]

相关问题