python mrjob mapreduce如何预处理输入文件

egmofgnx  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(515)

我正在尝试预处理一个xml文件,以便在放入mapreduce之前提取某些节点。我有以下代码:

from mrjob.compat import jobconf_from_env
from mrjob.job import MRJob
from mrjob.util import cmd_line, bash_wrap

class MRCountLinesByFile(MRJob):
    def configure_options(self):
        super(MRCountLinesByFile, self).configure_options()
        self.add_file_option('--filter')

    def mapper_cmd(self):
        cmd = cmd_line([self.options.filter, jobconf_from_env('mapreduce.map.input.file'])
        return cmd

if __name__ == '__main__':
    MRCountLinesByFile.run()

在命令行中,我键入:

python3 test_job_conf.py --filter ./filter.py -r local < test.txt
``` `test.txt` 是一个普通的xml文件。而 `filter.py` 是查找所有标题信息的脚本。
但是,我得到以下错误:

Creating temp directory /tmp/test_job_conf.vagrant.20160406.042648.689625
Running step 1 of 1...
Traceback (most recent call last):
File "./filter.py", line 8, in
with open(filename) as f:
FileNotFoundError: [Errno 2] No such file or directory: 'None'
Step 1 of 1 failed: Command '['./filter.py', 'None']' returned non-zero exit status 1

看起来像 `mapreduce.map.input.file` 提供 `None` 在这种情况下。我怎么能问 `mapper_cmd` 函数读取 `mrjob` 当前正在阅读?
vyu0f0g1

vyu0f0g11#

根据我的理解,在your self.add\ file\选项中应该有指向文件的路径。

self.add_file_option('--items', help='Path to u.item')

我不太明白你的想法,但这是我的理解。您可以使用configure选项来确保将给定的文件发送给所有Map程序进行处理,例如,当您要对源文件以外的其他文件中的数据执行辅助查找时。此辅助查找文件由self提供。添加文件选项('--items',help='path to u.item')。
要在reducer或mapper阶段之前进行预处理,可以使用reducer\u init或mapper\u init。这些初始化或处理步骤也需要在step函数中提及,例如下面所示。

def steps(self):
        return [
            MRStep(mapper=self.mapper_get_name,
                   reducer_init=self.reducer_init,
                   reducer=self.reducer_count_name),
            MRStep(reducer = self.reducer_find_maxname)
        ]

在init函数中,您可以在发送到mapper或reducer之前对需要执行的操作进行实际的预处理。例如,打开一个文件xyz,将第一个字段中的值复制到另一个字段中,我将在我的reducer中使用该字段并输出相同的值。

def reducer_init(self):
        self.movieNames = {}    
        with open("xyz") as f:
            for line in f:
                fields = line.split('|')
                self.myNames[fields[0]] = fields[1]

希望这有帮助!!

相关问题