使用mapreduce作业后,以下是输出:
User16565 Logins: 1 Orders:1
User16566 Logins: 2 Orders:2
User16567 Logins: 1 Orders:1
一切看起来都很好,但是当日志文件有成千上万个条目时,它就没有什么帮助了。有没有办法把我的代码改成“登录”和“订单”的总和,这样我就可以计算出两者之间的差异?
编辑:新问题
日志示例:
2013-01-01T08:48:09.009+0100,feature:login,-,User73511,-,-,-,-
2013-01-01T03:58:05.005+0100,feature:order-created,-,User73511,-,-,-,-
2013-01-01T01:26:30.030+0100,feature:login,-,User14253,-,-,-,-
2013-01-01T19:45:01.001+0100,feature:order-created,-,User73511,-,-,-,-
我在代码中发现了一个错误。我意识到登录和订单数不正确。起初看起来输出是正确的,但是当我手动检查登录和订单时,我意识到有一个错误。输出:
User73511 Logins: 3 Orders:2
User14253 Logins: 1 Orders:1
应该是:
User73511 Logins: 1 Orders:2
User14253 Logins: 1 Orders:0
以下是全部代码:
public class UserOrderCount {
public static class SingleUserMapper extends
Mapper<LongWritable, Text, Text, CountInformationTuple> {
private Text outUserId = new Text();
private CountInformationTuple outCountOrder = new CountInformationTuple();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String tempString = value.toString();
String[] singleUserData = tempString.split(",");
String userId = singleUserData[3];
String featureId = singleUserData[1];
if (featureId.contains("feature:order-created")) {
outCountOrder.setCountOrder(1);
}
if (featureId.contains("feature:login")) {
outCountOrder.setCountLogin(1);
}
outUserId.set(userId);
context.write(outUserId, outCountOrder);
}
}
public static class SingleUserReducer extends
Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {
private CountInformationTuple result = new CountInformationTuple();
public void reduce(Text key, Iterable<CountInformationTuple> values,
Context context) throws IOException, InterruptedException {
int login = 0;
int order = 0;
for (CountInformationTuple val : values) {
login += val.getCountLogin();
order += val.getCountOrder();
}
result.setCountLogin(login);
result.setCountOrder(order);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: UserOrderCount <in> <out>");
System.exit(2);
}
Job job = new Job(conf);
job.setJobName("UserOrderCount");
job.setJarByClass(UserOrderCount.class);
job.setMapperClass(SingleUserMapper.class);
job.setCombinerClass(SingleUserReducer.class);
job.setReducerClass(SingleUserReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountInformationTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class CountInformationTuple implements Writable {
private int countOrder = 0;
private int countLogin = 0;
public int getCountOrder() {
return countOrder;
}
public void setCountOrder(int order) {
this.countOrder = order;
}
public int getCountLogin() {
return countLogin;
}
public void setCountLogin(int login) {
this.countLogin = login;
}
@Override
public void readFields(DataInput in) throws IOException {
countOrder = in.readInt();
countLogin = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(countLogin);
out.writeInt(countOrder);
}
@Override
public String toString() {
return "Logins: "+ countLogin + "\t" + "Orders:" + countOrder;
}
}
}
2条答案
按热度按时间mnowg1ta1#
对于一个有趣的:解决了我的“错误输出”-错误。
ymdaylpp2#
由于您希望得到一个文件,因此可以使用配置mapreduce作业
jobConf.setNumReduceTasks(1)
要仅使用单个reduce任务,请参阅jobconf javadoc以获取更多信息。现在你的“唯一的减少”任务得到了全部
login
以及order
对每个用户都有效。你可以把所有的数字加起来login
以及order
reduce任务中已处理记录的值,并在cleanup()方法中输出求和值,该方法仅在处理单个reduce任务的所有输入记录后调用一次。示例代码:您将得到一条记录作为输出,其中
login
以及order
. 您可以修改cleanup()
计算差异的方法和其他措施(如果需要)。