我在用hadoop分析gsod数据(ftp://ftp.ncdc.noaa.gov/pub/data/gsod/). 我选择了5年来进行我的实验(2005-2009)。我已经配置了一个小集群,并执行了一个简单的mapreduce程序,可以获得一年内的最高温度。
现在,我必须创建一个新的mr程序,计算所有这些年发生的现象。
我必须分析的文件具有以下结构:
STN--- ... FRSHTO
722115 110001
722115 011001
722110 111000
722110 001000
722000 001000
stn列表示站点代码,frshtt列表示以下现象:f-雾、r-雨或毛毛雨、s-雪或冰粒、h-冰雹、t-雷电、o-龙卷风或漏斗云。
值1,表示该现象发生在当天;0,表示未出现。
我需要找到如下结果:
722115: F = 1, R = 2, S = 1, O = 2
722110: F = 1, R = 1, S = 2
722000: S = 1
我可以运行mr程序,但结果是错误的,给我这些结果:
722115 F, 1
722115 R, 1
722115 R, 1
722115 S, 1
722115 O, 1
722115 O, 1
722110 F, 1
722110 R, 1
722110 S, 1
722110 S, 1
722000 S, 1
我使用了这些代码:
Map器.java
public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, StationPhenomenun, IntWritable> {
@Override
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
String line = value.toString();
// Every file starts with a field description line, so, I ignore this line
if (!line.startsWith("STN---")) {
// First field of the line means the station code where data was collected
String station = line.substring(0, 6);
String fog = (line.substring(132, 133));
String rainOrDrizzle = (line.substring(133, 134));
String snowOrIcePellets = (line.substring(134, 135));
String hail = (line.substring(135, 136));
String thunder = (line.substring(136, 137));
String tornadoOrFunnelCloud = (line.substring(137, 138));
if (fog.equals("1"))
context.write(new StationPhenomenun(station,"F"), new IntWritable(1));
if (rainOrDrizzle.equals("1"))
context.write(new StationPhenomenun(station,"R"), new IntWritable(1));
if (snowOrIcePellets.equals("1"))
context.write(new StationPhenomenun(station,"S"), new IntWritable(1));
if (hail.equals("1"))
context.write(new StationPhenomenun(station,"H"), new IntWritable(1));
if (thunder.equals("1"))
context.write(new StationPhenomenun(station,"T"), new IntWritable(1));
if (tornadoOrFunnelCloud.equals("1"))
context.write(new StationPhenomenun(station,"O"), new IntWritable(1));
}
}
}
减速器.java
public class Reducer extends org.apache.hadoop.mapreduce.Reducer<StationPhenomenun, IntWritable, StationPhenomenun, IntWritable> {
protected void reduce(StationPhenomenun key, Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count++;
}
String station = key.getStation().toString();
String occurence = key.getPhenomenun().toString();
StationPhenomenun textPair = new StationPhenomenun(station, occurence);
context.write(textPair, new IntWritable(count));
}
}
站现象.java
public class StationPhenomenun implements WritableComparable<StationPhenomenun> {
private String station;
private String phenomenun;
public StationPhenomenun(String station, String phenomenun) {
this.station = station;
this.phenomenun = phenomenun;
}
public StationPhenomenun() {
}
public String getStation() {
return station;
}
public String getPhenomenun() {
return phenomenun;
}
@Override
public void readFields(DataInput in) throws IOException {
station = in.readUTF();
phenomenun = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(station);
out.writeUTF(phenomenun);
}
@Override
public int compareTo(StationPhenomenun t) {
int cmp = this.station.compareTo(t.station);
if (cmp != 0) {
return cmp;
}
return this.phenomenun.compareTo(t.phenomenun);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final StationPhenomenun other = (StationPhenomenun) obj;
if (this.station != other.station && (this.station == null || !this.station.equals(other.station))) {
return false;
}
if (this.phenomenun != other.phenomenun && (this.phenomenun == null || !this.phenomenun.equals(other.phenomenun))) {
return false;
}
return true;
}
@Override
public int hashCode() {
return this.station.hashCode() * 163 + this.phenomenun.hashCode();
}
}
ncdcjob.java文件
public class NcdcJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(NcdcJob.class);
FileInputFormat.addInputPath(job, new Path("/user/hadoop/input"));
FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/station"));
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setMapOutputKeyClass(StationPhenomenun.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(StationPhenomenun.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
有人做过类似的事吗?
附言:我尝试过这个解决方案(hadoop-复合密钥),但不适合我。
1条答案
按热度按时间vc6uscn91#
只需检查以下两个类是否与您的自定义实现匹配。
我能通过以下改变得到想要的结果
还将类名更改为
MyMapper
以及MyReducer
```722115,1,1,0,0,0,1
722115,0,1,1,0,0,1
722110,1,1,1,0,0,0
722110,0,0,1,0,0,0
722000,0,0,1,0,0,0
StationPhenomenun [station=722000, phenomenun=S] 1
StationPhenomenun [station=722110, phenomenun=F] 1
StationPhenomenun [station=722110, phenomenun=R] 1
StationPhenomenun [station=722110, phenomenun=S] 2
StationPhenomenun [station=722115, phenomenun=F] 1
StationPhenomenun [station=722115, phenomenun=O] 2
StationPhenomenun [station=722115, phenomenun=R] 2
StationPhenomenun [station=722115, phenomenun=S] 1