我正在尝试使用cloudera5.5.0实现一个简单的hadoop map reduce示例map&reduce步骤应该使用python2.6.6实现
问题:
如果脚本是在unix命令行上执行的,那么它们工作得非常好,并产生了预期的输出。
cat join2*.txt |./join3_mapper.py | sort |./join3_reducer.py
但是,将脚本作为hadoop任务执行会非常失败:
hadoop jar/usr/lib/hadoop mapreduce/hadoop-streaming.jar-input/user/cloudera/inputv/join2\u gen*.txt-output/user/cloudera/output\u tv-mapper/home/cloudera/join3\u mapper.py-reducer/home/cloudera/join3\u reducer.py-numreducetasks 1 16/01/06 12:32:32 INFO mapreduce.Job: Task Id : attempt_1452069211060_0026_r_000000_0, Status : FAILED Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538) at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Map器可以工作,如果hadoop命令是用-numreducetasks 0执行的,则hadoop作业只执行map步骤,成功结束,并且输出目录包含map步骤的结果文件。
我想那减少步骤一定有问题吧?
stderr logs in hue没有显示任何相关内容:
日志上载时间:星期三1月6日12:33:10-0800 2016日志长度:222 log4j:警告找不到记录器的附加程序(org.apache.hadoop.ipc.server)。log4j:warn请正确初始化log4j系统。log4j:请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig 更多信息。
脚本代码:第一个文件:join3\u mapper.py
# !/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip() #strip out carriage return
tuple2 = line.split(",") #split line, into key and value, returns a list
if len(tuple2) == 2:
key = tuple2[0]
value = tuple2[1]
if value == 'ABC':
print('%s\t%s' % (key, value) )
elif value.isdigit():
print('%s\t%s' % (key, value) )
第二个文件:join3\u reducer.py
# !/usr/bin/env python
import sys
last_key = None #initialize these variables
running_total = 0
abcFound =False;
this_key = None
# -----------------------------------
# Loop the file
# --------------------------------
for input_line in sys.stdin:
input_line = input_line.strip()
# --------------------------------
# Get Next Key value pair, splitting at tab
# --------------------------------
tuple2 = input_line.split("\t")
this_key = tuple2[0]
value = tuple2[1]
if value.isdigit():
value = int(value)
# ---------------------------------
# Key Check part
# if this current key is same
# as the last one Consolidate
# otherwise Emit
# ---------------------------------
if last_key == this_key:
if value == 'ABC': # filter for only ABC in TV shows
abcFound=True;
else:
if isinstance(value, (int,long) ):
running_total += value
else:
if last_key: #if this key is different from last key, and the previous
# (ie last) key is not empy,
# then output
# the previous <key running-count>
if abcFound:
print('%s\t%s' % (last_key, running_total) )
abcFound=False;
running_total = value #reset values
last_key = this_key
if last_key == this_key:
print('%s\t%s' % (last_key, running_total) )
我尝试了各种不同的方法将输入文件声明为hadoop命令,没有区别,也没有成功。
我做错什么了?提示,非常感谢您的想法谢谢
2条答案
按热度按时间jtjikinw1#
尝试将所有输入文本文件放在一个目录中,然后将该目录作为输入传递。这样您就不必合并所有的输入文件
ibrsph3r2#
多么幸运的一拳,和那一拳搏斗了好几天,我知道我成功了:
自本地(unix)执行
工作得很好我想到了使用1个合并的输入文件,而不是提供的6个输入文件,所以:
然后再次执行相同的hadoop命令,将输入定向到输入文件夹中的mergedinputfile-->完美结果,没有问题,没有异常工作完成。
对我来说,这提出了一个问题:
为什么它只处理一个合并的输入文件,而现在只提供较小的6个文件??不知道(还没有)