我运行这个程序,在编译代码时遇到了上下文错误,不知道如何修复请帮助
错误:找不到符号
public static void emit(int key, ArrayList<Double> values, Context context) throws IOException, InterruptedException {
^
符号:类上下文
位置:class movingavg
(获取错误在emit函数中找不到符号上下文)
package com.hadoop.imcdp;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MovingAvg {
// For production the windowlength would be a commandline or other argument
static double windowlength = 3.0;
static int thekey = (int) windowlength / 2;
// used for handlinag the circular list.
static boolean initialised = false;
// Sample window
static ArrayList <Double> window = new ArrayList <Double> ();
// The Map method processes the data one point at a time and passes the circular list to the
// reducer.
public static class Map extends Mapper <LongWritable, Text, Text, Text> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException {
double wlen = windowlength;
// creates windows of samples and sends them to the Reducer
partitionData(value, context, wlen);
}
// Create sample windows starting at each sata point and sends them to the reducer
private void partitionData(Text value, Context context, double wlen)
throws IOException {
String line = value.toString();
// the division must be done this way in the mapper.
Double ival = new Double(line) / wlen;
// Build initial sample window
if (window.size() < windowlength) {
window.add(ival);
}
// emit first window
if (!initialised && window.size() == windowlength) {
initialised = true;
emit(thekey, window, context);
thekey++;
return;
}
// Update and emit subsequent windows
if (initialised) {
// remove oldest datum
window.remove(0);
// add new datum
window.add(ival);
emit(thekey, window, context);
thekey++;
}
}
}
// Transform list to a string and send to reducer. Text to be replaced by ObjectWritable
// Problem: Hadoop apparently requires all output formats to be the same so
// cannot make this output collector differ from the one the reducer uses.
public static void emit(int key,
ArrayList <Double> value,
Context context) throws IOException {
// public static void emit(int key, ArrayList<Double> value, OutputCollector<Text,Text> output) throws IOException, InterruptedException {
Text tx = new Text();
tx.set(new Integer(key).toString());
String outstring = value.toString();
// remove the square brackets Java puts in
String tidied = outstring.substring(1, outstring.length() - 1).trim();
Text out = new Text();
out.set(tidied);
context.write(tx, out);
}
public static class Reduce extends Reducer <Text, Text, Text, Text> {
public void reduce(Text key,
Iterator <Text> values,
Context context
) throws IOException {
while (values.hasNext()) {
computeAverage(key, values, context);
}
}
// computes the average of each window and sends to ouptut collector.
private void computeAverage(Text key, Iterator <Text> values, Context context)
throws IOException {
double sum = 0;
String thevalue = values.next().toString();
String[] thenumbers = thevalue.split(",");
for (String temp: thenumbers) {
// need to trim the string because the constructor does not trim.
Double ds = new Double(temp.trim());
sum += ds;
}
Text out = new Text();
String outstring = Double.toString(sum);
out.set(outstring);
context.write(key, out);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MovingAvg.class);
job.setJobName("MovingAvg");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
1条答案
按热度按时间0ejtzxu11#
您粘贴的代码中没有上下文类的import语句。通常ide会自动处理这个问题。