python-如何将目录作为mapreduce输入传递

nbysray5  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(331)

我用python编写了一个简单的mapreduce示例。例如,如果输入是一个文件 text 为了运行代码,我们只需使用以下模式: cat <data> | map | sort | reduce 例如,在我的案例中: cat data | ./mapper.py | sort | ./reducer.py 一切正常。
但是我改变了我的Map器和缩小器,从一个 directory 包含 .gz 文件夹。所以我应该通过考试 path of the directory 作为输入。我测试以下终端命令 cat dat/ | ./mapper.py | sort | ./reducer.py 而包含数据的目录 dat/ ,但我面临错误:

cat: dat/: Is a directory
Traceback (most recent call last):
  File "./mapper.py", line 9, in <module>
    for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'

如何在python中将目录作为输入传递给mapreduce?
以下是我的代码:
Map器.py


# !/usr/bin/env python

import sys

# import timeit

import glob
import gzip

QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
    f = gzip.open(filename, 'r')
    for line in f:
        val = line.strip()
        (year, temp, q) = (val[15:19], val[87:92], val[92:93])
        if temp != MISSING and q in QUALITY:
            print " %s\t%s" % (year, temp)

异径管.py


# !/usr/bin/env python

import sys

max_val = -sys.maxint
key = ''
for line in sys.stdin:
    (key, val) = line.strip().split('\t')
    max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
6ojccjat

6ojccjat1#

要获取当前工作目录的路径,请使用:

import os
path = os.getcwd()

您可以从此文件中获取所有文件:

filenames = os.listdir(path)

# filter files that doesn't have .gz filetype

filenames = [file_name for file_name in filenames if file_name.endswith('.gz')]

您只需使用以下命令对文件进行迭代:

for filename in filenames:
    f = gzip.open(path+filename, 'r')
k2fxgqgv

k2fxgqgv2#

线路 for filename in glob.glob(sys.stdin + '*.gz'): 需要来自 stdin . 因此,只需传递一个字符串( echo )而不是文件内容( cat ):

$ echo dat/ | ./mapper.py | sort | ./reducer.py

但是,为什么要通过管道传递参数?python通常通过 sys.argv (或者最好是通过“argparse”之类的解释器)。

相关问题