首先,我通过oozie将mapreduce作为java操作运行。使用avrotools提取avro文件的模式,然后将其编译成一个java类(rxavro.java)。
每当我尝试使用avro对象时,都会得到以下classcastexception:java.lang.classcastexception:org.apache.avro.generic.genericdata$record不能强制转换到com..yyy.zz.jobs.actions.test.rxavro
2018-01-05 07:46:07,038 WARN [main] org.apache.hadoop.mapred.YarnChild:
Exception running child : java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.xxx.yyy.zzz.jobs.actions.test.RxAvro
at com.xxx.yyy.zzz.jobs.actions.test.AvroParqueMapreduce$Map.map(AvroParqueMapreduce.java:70)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
mapreduce代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import parquet.avro.AvroParquetInputFormat;
import java.io.IOException;
public class AvroParqueMapreduce extends Configured implements Tool {
/**
* Main entry point for the example.
*
* @param args arguments
* @throws Exception when something goes wrong
*/
public static void main(final String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new AvroParqueMapreduce(), args);
System.exit(res);
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(AvroParqueMapreduce.class);
job.setInputFormatClass(AvroParquetInputFormat.class);
AvroParquetInputFormat.setInputPaths(job, inputPath);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class Map extends Mapper<Void, RxAvro, Text, Text> {
@Override
public void map(Void key,
RxAvro value,
Context context) throws IOException, InterruptedException {
context.write(new Text(value.getObjectId().toString()),
new Text(value.getCollectionInterval().toString()));
}
}
任何帮助都将不胜感激。
1条答案
按热度按时间o4tp2gmn1#
代码中有几个问题。您应该如下正确设置输入格式类,avroparquetinputformat对于avro数据是不正确的。
除此之外,还需要设置输入模式
本教程可以作为起点https://dzone.com/articles/mapreduce-avro-data-files