如何从mapper获取hadoop输入文件名?

4dbbbstv  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(485)

hadoop streaming通过环境变量使文件名可用于每个map任务。
Python:

os.environ["map.input.file"]

java 语:

System.getenv(“map.input.file”).

鲁比呢?

mapper.rb

# !/usr/bin/env ruby

STDIN.each_line do |line|
  line.split.each do |word|
    word = word[/([a-zA-Z0-9]+)/] 
    word = word.gsub(/ /,"")
    puts [word, 1].join("\t")
  end
end

puts ENV['map.input.file']
laximzn5

laximzn51#

怎么样:

ENV['map.input.file']

ruby让您可以轻松地分配给env散列:

ENV['map.input.file'] = '/path/to/file'
cpjpxq1n

cpjpxq1n2#

使用op的输入,我尝试了mapper:


# !/usr/bin/python

import os
file_name = os.getenv('map_input_file')
print file_name

以及使用以下命令的标准字数缩减器:

hadoop fs -rmr /user/itsjeevs/wc && 
hadoop jar $STRMJAR  -files /home/jejoseph/wc_mapper.py,/home/jejoseph/wc_reducer.py \
    -mapper wc_mapper.py  \
    -reducer wc_reducer.py \
    -numReduceTasks 10  \
    -input "/data/*"  \
    -output wc

出错失败:

16/03/10 15:21:32 INFO mapreduce.Job: Task Id : attempt_1455931799889_822384_m_000043_0, Status : FAILED
Error: java.io.IOException: Stream closed
    at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:434)
    at java.io.OutputStream.write(OutputStream.java:116)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

16/03/10 15:21:32 INFO mapreduce.Job: Task Id : attempt_1455931799889_822384_m_000077_0, Status : FAILED
Error: java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:345)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

不知道发生了什么。

py49o6xq

py49o6xq3#

所有jobconf变量都通过hadoop流放入环境变量中。变量名通过转换任何不在中的字符而变得“安全” 0-9 A-Z a-z_ .
所以map.input.file=>Map输入文件
尝试: puts ENV['map_input_file']

相关问题