map/reduce计数的两阶段排序

hmae6n7t  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(414)

这个python3程序试图使用map/reduce从文本文件中生成单词的频率列表。我想知道如何对单词counts排序,在第二个reducer的yield语句中用'count'表示,以便最大的计数值出现在最后。目前,结果的尾部如下所示:

"0002"  "wouldn"
"0002"  "wrap"
"0002"  "x"
"0002"  "xxx"
"0002"  "young"
"0002"  "zone"

对于上下文,我将任何word文本文件传递到python3程序中,如下所示:

python MapReduceWordFreqCounter.py book.txt

这是你的密码 MapReduceWordFreqCounter.py :

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

# ignore whitespace characters

WORD_REGEXP = re.compile(r"[\w']+")

class MapReduceWordFreqCounter(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   reducer=self.reducer_count_words),
            MRStep(mapper=self.mapper_make_counts_key,
                   reducer = self.reducer_output_words)
        ]

    def mapper_get_words(self, _, line):
        words = WORD_REGEXP.findall(line)
        for word in words:
            yield word.lower(), 1

    def reducer_count_words(self, word, values):
        yield word, sum(values)

    def mapper_make_counts_key(self, word, count):
        yield str(count).rjust(4,'0'), word

    def reducer_output_words(self, count, words):
        for word in words:
            yield count, word

if __name__ == '__main__':
    MapReduceWordFreqCounter.run()
svujldwt

svujldwt1#

对于mrjob reduce步骤,不希望结果按键“count”排序。
在这里,mrjob导入允许您在本地和aws弹性mapreduce集群上运行代码。mrjob执行起来很繁重,因为它使用yarn api和hadoop流在map和reduce作业之间进行分布式数据传输。
例如,要在本地运行,可以将此mrjob运行为:python mapreducewordfreqcounter.py books.txt>counts.txt
在单个emr节点上运行:python mapreducewordfreqcounter.py-r emr books.txt>counts.txt
在25个emr节点上运行:python mapreducewordfreqcounter.py-r emr--num-ec2-instances=25 books.txt>counts.txt
要对分布式emr作业进行故障排除(替换作业id):python-m mrjob.tools.emr.fetch\u logs--find failure j-1xembeqfdt
在这里,当在四个节点上运行时,简化的结果是有序的,但在输出文件的四个不同部分中。事实证明,强制reducer生成一个有序的文件并没有比只在运行后的作业步骤中对结果进行排序更具性能优势。因此,解决这个特定问题的一种方法是使用linux命令sort:

sort word_frequency_list.txt > sorted_word_frequency_list.txt

产生这些“尾部”结果:
“0970”“of”“1191”“a”“1292”“the”“1420”“your”“1561”“you”“1828”“to”
一般来说,hadoop之上有一些框架非常适合这种处理。对于这个问题,可以使用pig读取处理过的文件并对计数进行排序。
pig可以通过gruntshell或pig脚本运行(使用区分大小写的pig拉丁语法)。pig脚本遵循以下模板:1)load语句读取数据2)一系列“transformation”语句处理数据3)dump/store语句保存结果
要使用pig命令计数:

reducer_count_output = LOAD 'word_frequency_list.txt' using PigStorage('  ') AS (word_count:chararray, word_name:chararray);
counts_words_ordered = ORDER reducer_count_output BY word_count ASC;
STORE counts_words_ordered INTO 'counts_words_ordered' USING PigStorage(':', '-schema');
sc4hvdpw

sc4hvdpw2#

你必须为你的工作设置自定义排序比较器。
如果你用java写的话

job.setSortComparatorClass(SortKeyComparator.class);

你必须提供一个类,给出相反的顺序

public class SortKeyComparator extends Text.Comparator {

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return (-1) * super.compare(b1, s1, l1, b2, s2, l2);
    }
}

我猜pythonhadoopapi有一些类似的方法来实现这个技巧。

相关问题