这是我的基本mapreduce程序:我正在尝试创建一个可写对象来保存数据,即nyc311ï可写类,并收到一个java.lang.classcastexception:有什么建议吗?
错误如下:
17/04/11 14:54:05 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
17/04/11 14:54:05 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.ClassCastException: class nyc311.NYC311_Writable
at java.lang.Class.asSubclass(Class.java:3404)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:887)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1004)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
主要类别:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package nyc311;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
* @author dhaval
*/
public class NYC311 {
/**
* @param args the command line arguments
* @throws java.io.IOException
* @throws java.lang.InterruptedException
* @throws java.lang.ClassNotFoundException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "NYC311");
job.setJarByClass(NYC311.class);
job.setMapperClass(NYC311_Mapper.class);
job.setMapOutputKeyClass(NYC311_Writable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(NYC311_Reducer.class);
job.setOutputKeyClass(NYC311_Writable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0: 1);
}
}
可写对象类:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package nyc311;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
*
* @author dhaval
*/
public class NYC311_Writable implements Writable{
private String Incident_Zip;
public NYC311_Writable() {
}
public NYC311_Writable(String Incident_Zip) {
this.Incident_Zip = Incident_Zip;
}
public String getIncident_Zip() {
return Incident_Zip;
}
public void setIncident_Zip(String Incident_Zip) {
this.Incident_Zip = Incident_Zip;
}
@Override
public void write(DataOutput d) throws IOException {
WritableUtils.writeString(d,Incident_Zip);
}
@Override
public void readFields(DataInput di) throws IOException {
Incident_Zip = WritableUtils.readString(di);
}
public static NYC311_Writable read(DataInput in) throws IOException {
NYC311_Writable w = new NYC311_Writable();
w.readFields(in);
return w;
}
}
Map器:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package nyc311;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
* @author dhaval
*/
public class NYC311_Mapper extends Mapper<Object, Text, NYC311_Writable, IntWritable>{
private IntWritable count = new IntWritable(1);
private Text zip = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
if (line[8].matches(".*[a-z].*")) {
}
else{
NYC311_Writable nyc_data = new NYC311_Writable();
nyc_data.setIncident_Zip(line[8]);
context.write(nyc_data, count);
}
}
}
减速器:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package nyc311;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @author dhaval
*/
public class NYC311_Reducer extends Reducer<NYC311_Writable, IntWritable, NYC311_Writable, IntWritable>{
private IntWritable count = new IntWritable();
@Override
protected void reduce(NYC311_Writable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
count.set(sum);
context.write(key, count);
}
}
1条答案
按热度按时间cx6n0qe31#
我认为您缺少可比较的,下面是指导您使用hadooptutorial.info/creating-custom-hadoop-writable-data-typ的博客e