我正在开发一个hadoop项目。我想找到某一天的顾客,然后写出当天消费量最大的顾客。在我的reducer类中,由于某种原因,全局变量max在for循环之后不会改变它的值。
编辑我想找到在某一天消费量最大的客户。我已经设法在我想要的日期找到了顾客,但是我在我的减速机课上遇到了一个问题。代码如下:
编辑#2我已经知道值(消耗)是自然数。所以在我的输出文件中,我只想成为某一天的客户,拥有最大消费量。
编辑#3我的输入文件由许多数据组成。它有三列;客户id、时间戳(yyyy-mm-dd hh:mm:ss)和消费
驾驶员等级
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class alicanteDriver {
public static void main(String[] args) throws Exception {
long t_start = System.currentTimeMillis();
long t_end;
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Alicante");
job.setJarByClass(alicanteDriver.class);
job.setMapperClass(alicanteMapperC.class);
//job.setCombinerClass(alicanteCombiner.class);
job.setPartitionerClass(alicantePartitioner.class);
job.setNumReduceTasks(2);
job.setReducerClass(alicanteReducerC.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt"));
FileOutputFormat.setOutputPath(job, new Path("/alicante_output"));
job.waitForCompletion(true);
t_end = System.currentTimeMillis();
System.out.println((t_end-t_start)/1000);
}
}
Map类
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class alicanteMapperC extends
Mapper<LongWritable, Text, Text, IntWritable> {
String Customer = new String();
SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date t = new Date();
IntWritable Consumption = new IntWritable();
int counter = 0;
// new vars
int max = 0;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Date d2 = null;
try {
d2 = ft.parse("2013-07-01 01:00:00");
} catch (ParseException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
if (counter > 0) {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line, ",");
while (itr.hasMoreTokens()) {
Customer = itr.nextToken();
try {
t = ft.parse(itr.nextToken());
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Consumption.set(Integer.parseInt(itr.nextToken()));
//sort out as many values as possible
if(Consumption.get() > max) {
max = Consumption.get();
}
//find customers in a certain date
if (t.compareTo(d2) == 0 && Consumption.get() == max) {
context.write(new Text(Customer), Consumption);
}
}
}
counter++;
}
}
减速器等级
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;
public class alicanteReducerC extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int max = 0; //this var
// declaration of Lists
List<Text> l1 = new ArrayList<Text>();
List<IntWritable> l2 = new ArrayList<IntWritable>();
for (IntWritable val : values) {
if (val.get() > max) {
max = val.get();
}
l1.add(key);
l2.add(val);
}
for (int i = 0; i < l1.size(); i++) {
if (l2.get(i).get() == max) {
context.write(key, new IntWritable(max));
}
}
}
}
输入文件的某些值
C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4
输出应为
C11FA586258 5
C11FA586413 5
我在论坛上搜索了几个小时,仍然找不到这个问题。有什么想法吗?
2条答案
按热度按时间l7wslrjt1#
下面是重构代码:您可以传递/更改消费日期的特定值。在这种情况下,你不需要减速器。我的第一个答案是从input查询最大消费量,这个答案是从input查询用户提供的消费量。
setup
方法将获取用户提供的值mapper.maxConsumption.date
把它们传给map
方法。cleaup
reducer中的方法扫描所有max consumpion客户,并在输入中写入最终max(本例中为5)-有关详细的执行日志,请参见屏幕截图:运行方式:
n3schb8v2#
可能所有的值都在0以下。尝试使用最小值来标识变量是否更改。
根据您所说的,输出应该只有0(在这种情况下,减速器中的最大值为0)或没有输出(所有值都小于0)。还有,看看这个
应该是的
编辑:我刚刚看到你的mapper类,它有很多问题。下面的代码将跳过每个Map器中的第一个元素。为什么?
我想,你是不是得到了这样的东西?”customer,2013-07-01 01:00:00,2,…“如果是这种情况,并且您已经在筛选值,则应将max变量声明为本地变量,而不是在Map器范围内,这将影响多个客户。
围绕这件事有很多问题。。你可以解释每个Map器的输入以及你想做什么。
根据你的回答,我会试试这个
每个客户发出一条记录非常简单