我在Java中的reduce函数在输出文件中写入了不期望的值。我用断点检查了我的代码,发现对于我所做的每个context.write
调用,我正在写入的键和值都是正确的。我在哪里犯了错误?
我尝试做的是接收代表交易的类型为date、customer、vendor、amount的输入行,并生成一个包含date、user、balance这样的行的数据集,其中balance是用户既是客户又是供应商的所有交易的总和。
下面是我的代码:
public class Transactions {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
var splittedValues = value.toString().split(",");
var date = splittedValues[0];
var customer = splittedValues[1];
var vendor = splittedValues[2];
var amount = splittedValues[3];
var reduceValue = new Text(customer + "," + vendor + "," + amount);
context.write(new Text(date), reduceValue);
}
}
public static class IntSumReducer
extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
Map<String, Integer> balanceByUserId = new ConcurrentHashMap<>();
values.forEach(transaction -> {
var splittedTransaction = transaction.toString().split(",");
var customer = splittedTransaction[0];
var vendor = splittedTransaction[1];
var amount = 0;
if (splittedTransaction.length > 2) {
amount = Integer.parseInt(splittedTransaction[2]);
}
if (!balanceByUserId.containsKey(customer)) {
balanceByUserId.put(customer, 0);
}
if (!balanceByUserId.containsKey(vendor)) {
balanceByUserId.put(vendor, 0);
}
balanceByUserId.put(customer, balanceByUserId.get(customer) - amount);
balanceByUserId.put(vendor, balanceByUserId.get(vendor) + amount);
});
balanceByUserId.entrySet().forEach(entry -> {
var reducerValue = new Text(entry.getKey() + "," + entry.getValue().toString());
try {
context.write(key, reducerValue);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "transactions");
job.setJarByClass(Transactions.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1条答案
按热度按时间wfypjpf41#
其中余额是用户既是客户又是供应商的所有事务处理的总和
balanceByUserId
只存在于每个唯一日期,因为您的Map键是日期。如果要按客户信息**(名称/ ID?)聚合**,则**
customer
**应为Map器输出的关键字。一旦每个客户的所有数据都按缩减器分组,您就可以按日期排序(如果需要),但也可以按其他详细信息进行聚合。
同样值得指出的是,这在Hive或SparkSQL中比在Mapreduce中更容易。