我有以下乔布斯先生的课程,但当我运行的工作,工作是失败的以下例外。
public class MongoKey implements WritableComparable<MongoKey> {
...
private Text name;
private Text place;
public MongoKey() {
this.name = new Text();
this.place = new Text();
}
public MongoKey(Text name, Text place) {
this.name = name;
this.place = place;
}
public void readFields(DataInput in) throws IOException {
name.readFields(in);
place.readFields(in);
}
public void write(DataOutput out) throws IOException {
name.write(out);
place.write(out);
}
public int compareTo(MongoKey o) {
MongoKey other = (MongoKey)o;
int cmp = name.compareTo(other.name);
if(cmp != 0){
return cmp;
}
return place.compareTo(other.place);
}
}
public class MongoValue implements Writable {
...
public void readFields(DataInput in) throws IOException {
profession.readFields(in);
}
public void write(DataOutput out) throws IOException {
profession.write(out);
}
}
public class MongoReducer extends Reducer<MongoKey, MongoValue, MongoKey, BSONWritable> {
...
context.write(key, new BSONWritable(output)); // line 41
}
public class MongoHadoopJobRunner extends Configured implements Tool {
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("usage: [input] [output]");
System.exit(-1);
}
Configuration conf = getConf();
for (String arg : args)
System.out.println(arg);
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
conf.set("mongo.output.uri", "mongodb://localhost/demo.logs_aggregate");
MongoConfigUtil.setOutputURI(conf, "mongodb://localhost/demo.logs_aggregate");
MongoConfigUtil.setOutputFormat(conf, MongoOutputFormat.class);
final Job job = new Job(conf, "mongo_hadoop");
job.setOutputFormatClass(MongoOutputFormat.class);
// Job job = new Job();
job.setJarByClass(MongoHadoopJobRunner.class);
// job.setJobName("mongo_hadoop");
job.setNumReduceTasks(1);
job.setMapperClass(MongoMapper.class);
job.setReducerClass(MongoReducer.class);
job.setMapOutputKeyClass(MongoKey.class);
job.setMapOutputValueClass(MongoValue.class);
job.setOutputKeyClass(MongoKey.class);
job.setOutputValueClass(BSONWritable.class);
job.setInputFormatClass(MongoInputFormat.class);
for (String arg2 : parser.getRemainingArgs()) {
System.out.println("remaining: " + arg2);
}
Path inPath = new Path(parser.getRemainingArgs()[0]);
MongoInputFormat.addInputPath(job, inPath);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] pArgs) throws Exception {
Configuration conf = new Configuration();
for (String arg : pArgs) {
System.out.println(arg);
}
GenericOptionsParser parser = new GenericOptionsParser(conf, pArgs);
for (String arg2 : parser.getRemainingArgs()) {
System.out.println("ree" + arg2);
}
System.exit(ToolRunner.run(conf, new MongoHadoopJobRunner(), parser
.getRemainingArgs()));
}
}
除了以下例外
java.lang.Exception: java.lang.IllegalArgumentException: can't serialize class com.name.custom.MongoKey
...
...
at com.mongodb.hadoop.output.MongoRecordWriter.write(MongoRecordWriter.java:93)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at com.name.custom.MongoReducer.reduce(MongoReducer.java:41)
at com.name.custom.MongoReducer.reduce(MongoReducer.java:11)
似乎应该没有任何问题的代码,但为什么它无法序列化的领域,我完全不知道。
非常感谢
2条答案
按热度按时间inkz8wg91#
这是:
引发异常,因为mongokey未实现java.io.serializable。
将serializable添加到类声明中
zy1mlcev2#
正如我从
MongoRecordWriter
它不支持任意源代码WritableComparable
对象作为键。可以将这些类之一用作键:BSONWritable
,BSONObject
,Text
,UTF8
简单的 PackageIntWritable
. 我想你也可以用Serializable
对象作为键。所以我可以建议你两个解决方法:使你的
MongoKey
可序列化(implements Serializable
,实现writeObject
,readObject
方法)。使用一个支持的类作为键,例如
Text
作为密钥:Text key = new Text(name.toString() + "\t" + place.toString());