我正在使用mapper生成两个输出文件。我正在使用multipleoutput api生成两个输出。我不确定是否正确。这是我的代码。。请仔细阅读并给出您的建议..当我运行代码时,我发现错误:java.lang.nullpointerexception。。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Export_Column_Mapping
{
public static void main(String[] args) throws Exception
{
String Output_filetype = args[2];
String Input_column_number = args[3];
String Output_column_number = args[4];
Configuration conf = new Configuration();
conf.setStrings("output_filetype",Output_filetype);
conf.setStrings("Input_column_number",Input_column_number);
conf.setStrings("Output_column_number",Output_column_number);
Job job = new Job(conf, "Export_Column_Mapping");
job.setJarByClass(Export_Column_Mapping.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(Export_Column_Mapping_Mapper.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class Export_Column_Mapping_Mapper extends Mapper<LongWritable,Text,Text,Text>
{
private MultipleOutputs<Text, LongWritable> mos ;
public void setup(TaskInputOutputContext<?, ?, Text, LongWritable> context) {
mos = new MultipleOutputs<Text, LongWritable>(context);
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String str_Output_filetype = context.getConfiguration().get("output_filetype");
String str_Input_column_number = context.getConfiguration().get("Input_column_number");
String[] input_columns_number = str_Input_column_number.split(",");
String str_Output_column_number= context.getConfiguration().get("Output_column_number");
String[] output_columns_number = str_Output_column_number.split(",");
String str_line = value.toString();
String[] input_column_array = str_line.split(",");
String[] Detail_output_column_array = new String[27];
String[] Shop_output_column_array = new String[8];
String details_output = null ;
String Shop_output = null;
try
{
for(int i = 0;i<=input_column_array.length+1; i++)
{
int int_outputcolumn = Integer.parseInt(output_columns_number[i]);
int int_inputcolumn = Integer.parseInt(input_columns_number[i]);
if((int_inputcolumn != 0) && (int_outputcolumn != 0) && output_columns_number.length == input_columns_number.length){
Detail_output_column_array[int_outputcolumn-1] = input_column_array[int_inputcolumn-1];
Shop_output_column_array[0] = Detail_output_column_array[0];
Shop_output_column_array[1] = Detail_output_column_array[1];
Shop_output_column_array[2] = Detail_output_column_array[2];
Shop_output_column_array[3] = Detail_output_column_array[3];
Shop_output_column_array[4] = Detail_output_column_array[14];
if(details_output != null)
{
details_output = details_output+" "+ Detail_output_column_array[int_outputcolumn-1];
Shop_output = Shop_output+" "+ Shop_output_column_array[int_outputcolumn-1];
}else
{
details_output = Detail_output_column_array[int_outputcolumn-1];
Shop_output = Shop_output_column_array[int_outputcolumn-1];
}
}
}
}catch (Exception e)
{
}
mos.write("Details File", null, details_output);
mos.write("Shop File", null, Shop_output);
}
}
}
这是日志。。
错误:com.nielsen.grfe.export\u column\u mapping$export\u column\u mapping\u mapper.map上的java.lang.nullpointerexception(export\u column\u mapping)。java:113)在com.nielsen.grfe.export\u column\u mapping$export\u column\u mapping\u mapper.map(export\u column\u mapping)。java:1)在org.apache.hadoop.mapreduce.mapper.run(mapper。java:145)在org.apache.hadoop.mapred.maptask.runnewmapper(maptask。java:787)在org.apache.hadoop.mapred.maptask.run(maptask。java:341)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:163)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:415)在org.apache.hadoop.security.usergroupinformation.doas(用户组信息。java:1671)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:158)
1条答案
按热度按时间wztqucjr1#
多重输出的定义如下:
而写以下是code:-
应该是这样的
改变你的定义,写下相应的东西如下:
还包括清理代码: