我对hadoop和mapreduce是完全陌生的,我正在努力解决它。我正在尝试用python开发一个mapreduce应用程序,其中使用2.csv文件中的数据。我只是在mapper中读取这两个文件,然后将这些文件中的键值对打印到sys.stdout
当我在一台机器上使用这个程序时,它运行得很好,但是在hadoop流媒体中,我得到了一个错误。我想我在hadoop上读取mapper中的文件时犯了一些错误。请帮助我的代码,并告诉我如何使用hadoop流文件处理。mapper.py代码如下(您可以从注解中理解代码):
# !/usr/bin/env python
import sys
from numpy import genfromtxt
def read_input(inVal):
for line in inVal:
# split the line into words
yield line.strip()
def main(separator='\t'):
# input comes from STDIN (standard input)
labels=[]
data=[]
incoming = read_input(sys.stdin)
for vals in incoming:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited;
if len(vals) > 10:
data.append(vals)
else:
labels.append(vals)
for i in range(0,len(labels)):
print "%s%s%s\n" % (labels[i], separator, data[i])
if __name__ == "__main__":
main()
有60000条记录从两个.csv文件输入到此Map器,如下所示(在单机上,而不是hadoop群集上):
cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py
2条答案
按热度按时间kmynzznz1#
你没有发布你的错误。在流式处理中,您需要传递-file参数或-input,这样文件就可以与流式处理作业一起上载,或者知道在hdfs上的何处可以找到它。
iq0todco2#
我能够解决这个问题后,搜索了大约3天的解决方案。
问题在于更新版本的hadoop(在我的例子中是2.2.0)。当从文件中读取值时,mapper代码在某个时刻给出了一个非零的退出代码(可能是因为它一次读取了一个巨大的值列表(784))。hadoop2.2.0中有一个设置,它告诉hadoop系统给出一个常规错误(子进程失败,代码为1)。默认情况下,此设置设置为true。我只需要将这个属性的值设置为false,它使我的代码运行时没有任何错误。
设置为:stream.non.zero.exit.is.failure。流式处理时只需将其设置为false。所以流式处理命令有点像:
希望它能帮助别人,并节省3天…;)