我想创建一个简单的函数来选择csv文件中的非空元组。我考虑了输入:csv文件的每一行,如果值不为null,则接收相同的元组。
我的计划如下:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class Selectfunction {
public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text>{
// Map void
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
//input
Text cle = new Text();
//int valeur1 = 0;
//int valeur2 = 1;
String[] tokens = value.toString().split(",");
String cle1 = tokens.toString();
for (int i=0;i<tokens.length;i++) {
// System.out.println("hana");
if(tokens[5].toString().equals(null)){
value.set(value);
}
cle.set(cle1);
//output.collect(word, one);
output.collect(new Text(cle), value);
}
}
}
public static void main(String args[])throws Exception
{
if(args.length!=2){
System.err.println("Usage: WeatherTemperature <input path> <output path>");
System.exit(-1);
}
// Create a new JobConf
JobConf job = new JobConf(new Configuration(), Selectfunction.class);
// Specify various job-specific parameters
job.setJobName("myjob");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(0);
job.setMapperClass(Selectfunction.Map.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobClient.runJob(job);
}
}
我收到以下错误:
16/05/25 23:32:26 INFO mapreduce.Job: Running job: job_1448020964278_0451
16/05/25 23:32:36 INFO mapreduce.Job: Job job_1448020964278_0451 running in uber mode : false
16/05/25 23:32:36 INFO mapreduce.Job: map 0% reduce 0%
16/05/25 23:32:44 INFO mapreduce.Job: Task Id : attempt_1448020964278_0451_m_000000_0, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
at select.Selectfunction$Map.map(Selectfunction.java:1)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
16/05/25 23:32:45 INFO mapreduce.Job: Task Id : attempt_1448020964278_0451_m_000001_0, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text
at select.Selectfunction$Map.map(Selectfunction.java:1)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
1条答案
按热度按时间k4aesqcs1#
您使用的是textinputformat,它使用的recordreader将longwritable作为键返回,将text作为值返回。这就是你得到上述例外的原因。请参阅本文档。
将方法签名更改为“public void map(longwriteabe键、文本值、outputcollector输出、reporter reporter)throws ioexception”可能有助于解决这个问题。
-阿米特