hadoop MapReduce:Reduce函数正在写入非预期的奇怪值

h6my8fg2  于 2022-11-01  发布在  Hadoop
关注(0)|答案(1)|浏览(208)

我在Java中的reduce函数在输出文件中写入了不期望的值。我用断点检查了我的代码,发现对于我所做的每个context.write调用,我正在写入的键和值都是正确的。我在哪里犯了错误?
我尝试做的是接收代表交易的类型为date、customer、vendor、amount的输入行,并生成一个包含date、user、balance这样的行的数据集,其中balance是用户既是客户又是供应商的所有交易的总和。
下面是我的代码:

public class Transactions {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, Text>{

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            var splittedValues = value.toString().split(",");
            var date = splittedValues[0];
            var customer = splittedValues[1];
            var vendor = splittedValues[2];
            var amount = splittedValues[3];
            var reduceValue = new Text(customer + "," + vendor + "," + amount);
            context.write(new Text(date), reduceValue);
        }
    }

    public static class IntSumReducer
            extends Reducer<Text,Text,Text,Text> {

        public void reduce(Text key, Iterable<Text> values,
                           Context context
        ) throws IOException, InterruptedException {
            Map<String, Integer> balanceByUserId = new ConcurrentHashMap<>();
            values.forEach(transaction -> {
                var splittedTransaction = transaction.toString().split(",");
                var customer = splittedTransaction[0];
                var vendor = splittedTransaction[1];
                var amount = 0;
                if (splittedTransaction.length > 2) {
                    amount = Integer.parseInt(splittedTransaction[2]);
                }
                if (!balanceByUserId.containsKey(customer)) {
                    balanceByUserId.put(customer, 0);
                }
                if (!balanceByUserId.containsKey(vendor)) {
                    balanceByUserId.put(vendor, 0);
                }
                balanceByUserId.put(customer, balanceByUserId.get(customer) - amount);
                balanceByUserId.put(vendor, balanceByUserId.get(vendor) + amount);
            });

            balanceByUserId.entrySet().forEach(entry -> {
                var reducerValue = new Text(entry.getKey() + "," + entry.getValue().toString());
                try {
                    context.write(key, reducerValue);
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "transactions");
        job.setJarByClass(Transactions.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
wfypjpf4

wfypjpf41#

其中余额是用户既是客户又是供应商的所有事务处理的总和
balanceByUserId只存在于每个唯一日期,因为您的Map键是日期。
如果要按客户信息**(名称/ ID?)聚合**,则**customer**应为Map器输出的关键字。
一旦每个客户的所有数据都按缩减器分组,您就可以按日期排序(如果需要),但也可以按其他详细信息进行聚合。
同样值得指出的是,这在Hive或SparkSQL中比在Mapreduce中更容易。

相关问题