我正在编写一个hadoop流作业,其中Map器是用python编写的简单数据清理,在reducer部分,我想使用r运行某些时间序列分析。然而,调试mapreduce作业本身并不是那么容易,我最终以一种能够识别键和值的方式编写了我的reduce,然后将它们写出结果而不做任何修改。但是,它仍然不起作用,我的python代码做了完全相同的事情,可以毫无问题地工作。
当streaing作业使用r作为reducer时,错误消息如下所示:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320) at
org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533) at
org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at
org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:237) at
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459) at
org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:415) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554) at
org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:162)
我在一个由4个数据节点(每个64gbmem)组成的集群上运行这个流作业,它总共生成了大约500个Map器和60个缩减器。我的两种语言的代码都发布了。欢迎任何建议或帮助!
这是减速机
# !/usr/bin/python
import sys
delimiter = '\t'
for line in sys.stdin:
line = line.strip()
mykey, myvalue = line.split(delimiter)
print delimiter.join([mykey, myvalue])
这是减速器
# !/usr/bin/Rscript
library(dplyr)
library(outliers)
library(zoo)
library(forecast)
# library(tsoutliers)
f <- file("stdin")
open(f, open="r")
options(warn=-1)
mydelimiter <- '\t'
sink('/dev/null')
while(length(line<-readLines(f, n=1)) > 0){
tryCatch(
{
line <- gsub('\n', '', line)
fields <- unlist(strsplit(line, split=mydelimiter))
mykey_new <- fields[1]
myvalue_new <- fields[2]
sink()
cat(mykey_new);cat(mydelimiter);cat(myvalue_new);cat('\n')
sink('/dev/null')
},
error=function(e){}
)
}
close(f)
1条答案
按热度按时间fhity93d1#
您得到的java错误只是告诉您运行程序失败,但它没有告诉您程序中的问题是什么。您需要从容器中获取stderr日志。
如果你去你的jobtracker页面(比如http://myserver.com:50030/jobtracker.jsp)并找到失败的作业,您可以单击“失败的任务”列中的数字,然后从那里访问任务日志。你应该能在那里找到问题。