hadoop中识别密钥来处理mapreduce

w6lpcovy  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(266)

我有两个来自map函数的键值:ny和others。所以,我的键的输出是:ny1,或other1。只有这两种情况。
我的Map功能:


# !/usr/bin/env python

    import sys
    import csv
    import string

    reader = csv.reader(sys.stdin, delimiter=',')
    for entry in reader:
        if len(entry) == 22:
            registration_state=entry[16]
            print('{0}\t{1}'.format(registration_state,int(1)))

现在我需要使用减速机来处理map输出。我的减少:


# !/usr/bin/env python

import sys
import string

currentkey = None
ny = 0
other = 0

# input comes from STDIN (stream data that goes to the program)

for line in sys.stdin:

    #Remove leading and trailing whitespace
    line = line.strip()

    #Get key/value 
    key, values = line.split('\t', 1)  
    values = int(values)

# If we are still on the same key...

    if key == 'NY':
        ny = ny + 1
    #Otherwise, if this is a new key...
    else:
        #If this is a new key and not the first key we've seen
        other = other + 1

# Compute/output result for the last key

print('{0}\t{1}'.format('NY',ny))
print('{0}\t{1}'.format('Other',other))

从这些结果中,mapreduce将给出两个输出结果文件,每个文件包含ny和其他输出。i、 一个包含:ny 1248,其他4677;另一个:纽约0,其他1000。这是因为两张缩小的分割图的输出,所以产生了两个结果,通过合并(merge)最终将输出结果。
但是,我想更改reduce或map函数,使每个reduced进程只处理一个键,即一个reduced进程只处理ny作为键值,另一个进程处理另一个键值。我希望有一个结果包含:

NY 1258, Others 0; Another: NY 0, Others 5677.

如何调整我的职能以达到预期的效果?

wqsoz72f

wqsoz72f1#

可能需要使用python迭代器和生成器。一个很好的例子就是这个链接。我试过用相同的代码重新编写代码(未测试)
Map器:


# !/usr/bin/env python

"""A more advanced Mapper, using Python iterators and generators."""

import sys

def main(separator='\t'):
    reader = csv.reader(sys.stdin, delimiter=',')
    for entry in reader:
    if len(entry) == 22:
        registration_state=entry[16]
        print '%s%s%d' % (registration_state, separator, 1)

if __name__ == "__main__":
    main()

减速器:

!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

相关问题