用python在hadoop流媒体中使用文件

33qvvth1  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(368)

我对hadoop和mapreduce是完全陌生的,我正在努力解决它。我正在尝试用python开发一个mapreduce应用程序,其中使用2.csv文件中的数据。我只是在mapper中读取这两个文件,然后将这些文件中的键值对打印到sys.stdout
当我在一台机器上使用这个程序时,它运行得很好,但是在hadoop流媒体中,我得到了一个错误。我想我在hadoop上读取mapper中的文件时犯了一些错误。请帮助我的代码,并告诉我如何使用hadoop流文件处理。mapper.py代码如下(您可以从注解中理解代码):


# !/usr/bin/env python

import sys
from numpy import genfromtxt

def read_input(inVal):
    for line in inVal:
        # split the line into words
        yield line.strip()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    labels=[]
    data=[]    
    incoming = read_input(sys.stdin)
    for vals in incoming:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited;
        if len(vals) > 10:
            data.append(vals)
        else:
            labels.append(vals)

    for i in range(0,len(labels)):
        print "%s%s%s\n" % (labels[i], separator, data[i])

if __name__ == "__main__":
    main()

有60000条记录从两个.csv文件输入到此Map器,如下所示(在单机上,而不是hadoop群集上):

cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py
kmynzznz

kmynzznz1#

你没有发布你的错误。在流式处理中,您需要传递-file参数或-input,这样文件就可以与流式处理作业一起上载,或者知道在hdfs上的何处可以找到它。

iq0todco

iq0todco2#

我能够解决这个问题后,搜索了大约3天的解决方案。
问题在于更新版本的hadoop(在我的例子中是2.2.0)。当从文件中读取值时,mapper代码在某个时刻给出了一个非零的退出代码(可能是因为它一次读取了一个巨大的值列表(784))。hadoop2.2.0中有一个设置,它告诉hadoop系统给出一个常规错误(子进程失败,代码为1)。默认情况下,此设置设置为true。我只需要将这个属性的值设置为false,它使我的代码运行时没有任何错误。
设置为:stream.non.zero.exit.is.failure。流式处理时只需将其设置为false。所以流式处理命令有点像:


**hadoop jar ... -D stream.non.zero.exit.is.failure=false ...**

希望它能帮助别人,并节省3天…;)

相关问题