hbase+hadoop的mapreduce使用负值给出错误的和

inkz8wg9  于 2021-06-03  发布在  Hadoop
关注(0)|答案(0)|浏览(210)

我需要检查一下MapReduce HBase 要求和的表 Double 价值观。我遵循hbase文档中的示例。
属性最初是通过将字符串转换为字节数组(使用hbase的 Bytes.toBytes(value) )但现在我需要把它们当作 Double 把他们的价值加起来。对于只有正值的列,它给出了正确的和,但是我有一个列也有一些负值(称为 diferenca 在下面的代码中)。
当我运行这个作业时,它给了我一个错误的答案,我注意到reduce任务就像是获取数字的模块,或者类似的东西。但是当我调试时,我看到解析的double对象 valor 正确地说是负数。我不知道这是什么原因。。。
作业设置:

job = new Job(hTable.getConfiguration(), "All");
            job.setJarByClass(HBaseQuery.class);
            scan.setCaching(500);
            scan.setCacheBlocks(false);
            TableMapReduceUtil.initTableMapperJob(
                    tabela,        // input table
                    scan,               
                    CalculoTotaisMapper.class,     // mapper class
                    Text.class,         // mapper output key
                    DoubleWritable.class,  // mapper output value
                    job);
            TableMapReduceUtil.initTableReducerJob(
                    tabela,        // output table
                    CalculoTotaisReducer.class,    // reducer class
                    job);
            job.setNumReduceTasks(1);   // at least one, adjust as required

            boolean b = job.waitForCompletion(true);

我的Map器:

public class CalculoTotaisMapper extends TableMapper<Text, DoubleWritable> {

    public static final String[] ATTRS = 
      new String[]{"valortotalprestador", "valortotalconvenio", "diferenca"};

    private Text text = new Text();
    private Logger logger = LoggerFactory.getLogger(CalculoTotaisMapper.class);

    public void map(ImmutableBytesWritable row, Result value, Context context)
        throws IOException, InterruptedException {

        Double val = 0.0;
        for (String attr : ATTRS) {
            byte[] valueBytes = value.getValue("hc".getBytes(), attr.getBytes());
            String valueString = Bytes.toString(valueBytes);

            val = Double.parseDouble(valueString);

            text.set(attr);

            context.write(text, new DoubleWritable(val));

        }

    }
}

我的减速机:

public class CalculoTotaisReducer extends TableReducer <Text, DoubleWritable,
                                                   ImmutableBytesWritable> {

    public static final byte[] CF = "qu".getBytes();

    public void reduce(Text key, Iterable<DoubleWritable> values,
       Context context) throws IOException, InterruptedException {
        double i = 0.0;
        for (DoubleWritable val : values) {
            double valor =  val.get();
           i += valor;
        }

        Put put = new Put(Bytes.toBytes("all"));
        put.add(CF, key.getBytes(), Bytes.toBytes(i));

        context.write(null, put);
    }

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题