hadoop不理解复合键是相等的

omqzjyyz  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(399)

我的数据集具有以下格式:

userID mediaID rating

我想找出任何一对mediaid在所有用户中同时获得高于阈值的评级。为此,我遵循了几个示例来实现一个复合键。我写了一个pairkey类,它存储一个唯一的对,实现compareto并重写hashcode和equals。。。

public static class PairKey implements WritableComparable<PairKey> {

    private Integer lowID;
    private Integer highID;

    public PairKey() {

        this.lowID = -1;
        this.highID = -1;

    }

    public PairKey(Integer one, Integer two) {
        //should be impossible
        if (one.equals(two)) {
            throw new IllegalArgumentException("Cannot have a pair key with identical IDs");
        }
        if (one < two) {
            lowID = one;
            highID = two;
        }
        else {
            lowID = two;
            highID = one;
        }
    }

    public Integer getLowID() {
        return lowID;
    }

    public Integer getHighID() {
        return highID;
    }

    public void setLowID(Integer _lowID) {
        lowID = _lowID;
    }

    public void setHighID(Integer _highID) {
        highID = _highID;
    }

    @Override
    public int compareTo(PairKey other) {
        int _lowCompare = lowID.compareTo(other.getLowID());
        if (_lowCompare != 0) {
            return _lowCompare;
        }
        int _highCompare = highID.compareTo(other.getHighID());
        return _highCompare;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(lowID.intValue());
        dataOutput.writeInt(highID.intValue());
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        lowID = new Integer(dataInput.readInt());
        highID = new Integer(dataInput.readInt());
    }

    @Override
    public String toString() {
        return "<" + lowID + ", " + highID + ">";
    }

    @Override
    public boolean equals(Object o) {

        if (this == o) {
            return true;
        }
        if ( o == null || this.getClass() != o.getClass()) {
            return false;
        }

        PairKey other = (PairKey) o;

        //compare fields
        if (this.lowID != null ?    this.lowID.equals(other.getLowID()) == false  : other.getLowID() != null) return false;
        if (this.highID != null ?   this.highID.equals(other.getHighID()) == false : other.getHighID() != null) return false;

        return true;
    }

    @Override
    public int hashCode() {
        int _lowHash = this.lowID.hashCode();
        int _highHash = this.highID.hashCode();
        return 163 * (_lowHash ) + _highHash;
    }
}

这是我的Map程序代码,我将所有超过阈值的movieid存储在每个用户的一个集合中,然后在这个集合中发出所有可能的对:

public static class PairMapper extends Mapper<Text, Text, PairKey, IntWritable> {

    private Map<Integer, SortedSet<Integer>> temp = new HashMap<Integer, SortedSet<Integer>>();
    private IntWritable one = new IntWritable(1);
    private PairKey _key = new PairKey();

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        Integer userID = new Integer(key.toString());
        String[] vals = value.toString().split("\t");
        String _movieID = vals[0];
        String _rating = vals[1];
        Integer movieID = new Integer(_movieID);
        Integer rating = new Integer(_rating);
        if (rating > 3) {
            SortedSet candidates  = temp.get(userID);
            if (candidates == null) {
                candidates = new TreeSet<Integer>();
            }
            candidates.add(movieID);
            temp.put(userID, candidates);

        }
    }//map

    public void cleanup(Context context) throws IOException, InterruptedException {

        for (Map.Entry<Integer, SortedSet<Integer>> e : temp.entrySet()) {

            SortedSet<Integer> _set = e.getValue();
            Integer [] arr = _set.toArray(new Integer[_set.size()]);
            for (int i = 0 ; i < arr.length-1 ; i++) {
                for (int j = i+1 ; j < arr.length ; j++) {
                    _key.setLowID(arr[i]);
                    _key.setHighID(arr[j]);
                    context.write(_key, one);
                }//for j

            }//for i

        }

    }//cleanup

}//PairMapper

这是我的减速机:

public static class PairReducer extends Reducer<PairKey, Iterable<IntWritable>, Text, IntWritable> {

    public void reduce(PairKey key, Iterable<IntWritable> vals, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : vals) {
            sum+= val.get();
        }//for
        IntWritable result = new IntWritable(sum);
        context.write(new Text(key.toString()), result);
    } //reduce

}

这是我的主要方法:

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {
        System.err.println("Usage: moviepairs <in> <out>");
        System.exit(2);
    }

    //CONFIGURE THE JOB
    Job job = new Job(conf, "movie pairs");

    job.setJarByClass(MoviePairs.class);

   job.setSortComparatorClass(CompositeKeyComparator.class);
   job.setPartitionerClass(NaturalKeyPartitioner.class);
   job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);

    //map-reduce classes
    job.setMapperClass(PairMapper.class);
    job.setCombinerClass(PairReducer.class);
    job.setReducerClass(PairReducer.class);

    //key-val classes
    job.setMapOutputKeyClass(PairKey.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true)? 0 :1);

}

我希望在我的减速机里得到这个:

pair <1,2>: [1,1,1]

相反,减速机似乎并不理解成对的相等性。而是输出:

pair<1,2>: [1]
pair<1,2>: [1]
pair<1,2>: [1]

不知道我错过了什么。如您所见,我已经尝试了几种方法,比如添加一个定制的排序器(我认为我不需要它,并使用一个分组比较器、定制分区器),但我认为简单地重写hashcode/equals应该可以解决这个问题(不确定)。我在网上找到的所有例子似乎都遵循这一点,它们似乎都很有效。

ryhaxcpt

ryhaxcpt1#

像往常一样,对于那些初学者的问题,这个问题是完全不相关的。我把减速机接口搞砸了。而不是 <KEYIN, VALIN...> 我在做什么 <KEYIN, ITERABLE<VALIN>....>

相关问题