hadoop map reduce-如何将分组和排序分开?

olhwl3o2  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(273)

刚刚开始写hadoop,乔布斯先生。希望我们能很快切换到spark,但我们现在只能做mr了。
我想将记录按其值的散列进行分组。但我想用一些完全不相关的东西对它们进行排序——它们值中的时间戳。我不知道怎样才能做到最好。我看到两种选择:
1) 首先有一个mr作业,它为Map器中的每个值计算散列,然后将该散列的所有记录按它所希望的方式减少到相同的值(实际上,我现在有这么多工作,就像我们现在需要的那样)。然后链接第二个mr作业,该作业根据值中的时间戳对上述reducer的输出进行重新排序。效率低下?
2) 我读过一些关于如何使用复合键的博客/帖子,所以也许我可以一步完成这一切?我会创建某种复合键,它既有用于分组的哈希,也有用于在Map器中排序的时间戳。但我不清楚这是否可能。如果排序与分组完全无关,它还能正确分组吗?也不确定我需要实现什么接口,需要创建什么类,或者如何配置它。
我不是说第二种。我不关心迭代器中每个reduce调用的对象顺序。我关心的是从reducer中发出东西的顺序,需要按时间戳进行全局排序。
这样做的推荐方法是什么?

rekjcdws

rekjcdws1#

如果在reduce之前有一个封装分组和排序属性的复合键,那么这是绝对可能的。
假设您需要一个包含int哈希代码和长时间戳的密钥。然后需要实现一个可写元组(比如intlongpair),在这里可以定义用例所需的各种比较器和分区器。
所以您将您的工作设置为这样(稍后我将返回到可能的intlongpair实现):

job.setPartitionerClass(IntLongPair.IntOnlyPartitioner.class); //partition by your hash code stored in the int part of the part
job.setGroupingComparatorClass(IntLongPair.IntAscComparator.class); //your hash code grouping - perhaps does not matter ascending or descending
job.setSortComparatorClass(IntLongPair.IntDescLongAscComparator.class); //assuming you need newest items first


以下是您可以使用的intlongpair:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Partitioner;

public class IntLongPair implements WritableComparable<IntLongPair> {

    private IntWritable intVal = new IntWritable();
    private LongWritable longVal = new LongWritable();

    public void write(DataOutput d) throws IOException {
        intVal.write(d);
        longVal.write(d);
    }

    public void readFields(DataInput di) throws IOException {
        intVal.readFields(di);
        longVal.readFields(di);
    }

    /**
     * Natural order is int first, long next
     * @param o
     * @return
     */
    public int compareTo(IntLongPair o) {
        int diff = intVal.compareTo(o.intVal);
        if (diff != 0) {
            return diff;
        }
        return longVal.compareTo(o.longVal);
    }

    public IntWritable getInt() {
        return intVal;
    }

    public void setInt(IntWritable intVal) {
        this.intVal = intVal;
    }

    public void setInt(int intVal) {
        this.intVal.set(intVal);
    }

    public LongWritable getLong() {
        return longVal;
    }

    public void setLong(LongWritable longVal) {
        this.longVal = longVal;
    }

    public void setLong(long longVal) {
        this.longVal.set(longVal);
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final IntLongPair other = (IntLongPair) obj;
        if (this.intVal != other.intVal && (this.intVal == null || !this.intVal.equals(other.intVal))) {
            return false;
        }
        if (this.longVal != other.longVal && (this.longVal == null || !this.longVal.equals(other.longVal))) {
            return false;
        }
        return true;
    }

    @Override
    public int hashCode() {
        int hash = 3;
        hash = 47 * hash + (this.intVal != null ? this.intVal.hashCode() : 0);
        hash = 47 * hash + (this.longVal != null ? this.longVal.hashCode() : 0);
        return hash;
    }

    @Override
    public String toString() {
        return "IntLongPair{" + intVal + ',' + longVal + '}';
    }

    public IntWritable getFirst() {
        return intVal;
    }

    public LongWritable getSecond() {
        return longVal;
    }

    public void setFirst(IntWritable value) {
        intVal.set(value.get());
    }

    public void setSecond(LongWritable value) {
        longVal.set(value.get());
    }

    public static class Comparator extends WritableComparator {

        public Comparator() {
            super(IntLongPair.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return compareBytes(b1, s1, l1, b2, s2, l2);
        }
    }

    static {                                        // register this comparator
        WritableComparator.define(IntLongPair.class, new Comparator());
    }

    public static class IntDescLongAscComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            int comp = IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
            if (comp != 0) {
                return -comp;
            }
            return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            int comp = o1.getInt().compareTo(o2.getInt());
            if (comp != 0) {
                return -comp;
            }
            return o1.getLong().compareTo(o2.getLong());
        }
    }

    public static class LongAscIntAscComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
            if (comp != 0) {
                return comp;
            }
            return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            int comp = o1.getLong().compareTo(o2.getLong());
            if (comp != 0) {
                return comp;
            }
            return  o1.getInt().compareTo(o2.getInt());
        }
    }

    public static class LongAscIntDescComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
            if (comp != 0) {
                return comp;
            }
            return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            int comp = o1.getLong().compareTo(o2.getLong());
            if (comp != 0) {
                return comp;
            }
            return -o1.getInt().compareTo(o2.getInt());
        }
    }

    public static class LongDescIntAscComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
            if (comp != 0) {
                return -comp;
            }
            return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            int comp = o1.getLong().compareTo(o2.getLong());
            if (comp != 0) {
                return -comp;
            }
            return o1.getInt().compareTo(o2.getInt());
        }
    }

    public static class LongDescIntDescComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
            if (comp != 0) {
                return -comp;
            }
            return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            int comp = o1.getLong().compareTo(o2.getLong());
            if (comp != 0) {
                return -comp;
            }
            return -o1.getInt().compareTo(o2.getInt());
        }
    }

    public static class IntAscComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            return o1.getInt().compareTo(o2.getInt());
        }
    }

    public static class IntDescComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            return -o1.getInt().compareTo(o2.getInt());
        }
    }

    public static class LongAscComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            return o1.getLong().compareTo(o2.getLong());
        }
    }

    public static class LongDescComparator implements RawComparator<IntLongPair> {

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
        }

        public int compare(IntLongPair o1, IntLongPair o2) {
            return -o1.getLong().compareTo(o2.getLong());
        }
    }

    /**
     * Partition based on the long part of the pair.
     */
    public static class LongOnlyPartitioner extends Partitioner<IntLongPair, Writable> {

        @Override
        public int getPartition(IntLongPair key, Writable value,
                int numPartitions) {
            return Math.abs(key.getLong().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }

    /**
     * Partition based on the int part of the pair.
     */
    public static class IntOnlyPartitioner extends Partitioner<IntLongPair, Writable> {

        @Override
        public int getPartition(IntLongPair key, Writable value,
                int numPartitions) {
            return Math.abs(key.getInt().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }
}

相关问题