我的数据集具有以下格式:
userID mediaID rating
我想找出任何一对mediaid在所有用户中同时获得高于阈值的评级。为此,我遵循了几个示例来实现一个复合键。我写了一个pairkey类,它存储一个唯一的对,实现compareto并重写hashcode和equals。。。
public static class PairKey implements WritableComparable<PairKey> {
private Integer lowID;
private Integer highID;
public PairKey() {
this.lowID = -1;
this.highID = -1;
}
public PairKey(Integer one, Integer two) {
//should be impossible
if (one.equals(two)) {
throw new IllegalArgumentException("Cannot have a pair key with identical IDs");
}
if (one < two) {
lowID = one;
highID = two;
}
else {
lowID = two;
highID = one;
}
}
public Integer getLowID() {
return lowID;
}
public Integer getHighID() {
return highID;
}
public void setLowID(Integer _lowID) {
lowID = _lowID;
}
public void setHighID(Integer _highID) {
highID = _highID;
}
@Override
public int compareTo(PairKey other) {
int _lowCompare = lowID.compareTo(other.getLowID());
if (_lowCompare != 0) {
return _lowCompare;
}
int _highCompare = highID.compareTo(other.getHighID());
return _highCompare;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(lowID.intValue());
dataOutput.writeInt(highID.intValue());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
lowID = new Integer(dataInput.readInt());
highID = new Integer(dataInput.readInt());
}
@Override
public String toString() {
return "<" + lowID + ", " + highID + ">";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if ( o == null || this.getClass() != o.getClass()) {
return false;
}
PairKey other = (PairKey) o;
//compare fields
if (this.lowID != null ? this.lowID.equals(other.getLowID()) == false : other.getLowID() != null) return false;
if (this.highID != null ? this.highID.equals(other.getHighID()) == false : other.getHighID() != null) return false;
return true;
}
@Override
public int hashCode() {
int _lowHash = this.lowID.hashCode();
int _highHash = this.highID.hashCode();
return 163 * (_lowHash ) + _highHash;
}
}
这是我的Map程序代码,我将所有超过阈值的movieid存储在每个用户的一个集合中,然后在这个集合中发出所有可能的对:
public static class PairMapper extends Mapper<Text, Text, PairKey, IntWritable> {
private Map<Integer, SortedSet<Integer>> temp = new HashMap<Integer, SortedSet<Integer>>();
private IntWritable one = new IntWritable(1);
private PairKey _key = new PairKey();
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
Integer userID = new Integer(key.toString());
String[] vals = value.toString().split("\t");
String _movieID = vals[0];
String _rating = vals[1];
Integer movieID = new Integer(_movieID);
Integer rating = new Integer(_rating);
if (rating > 3) {
SortedSet candidates = temp.get(userID);
if (candidates == null) {
candidates = new TreeSet<Integer>();
}
candidates.add(movieID);
temp.put(userID, candidates);
}
}//map
public void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<Integer, SortedSet<Integer>> e : temp.entrySet()) {
SortedSet<Integer> _set = e.getValue();
Integer [] arr = _set.toArray(new Integer[_set.size()]);
for (int i = 0 ; i < arr.length-1 ; i++) {
for (int j = i+1 ; j < arr.length ; j++) {
_key.setLowID(arr[i]);
_key.setHighID(arr[j]);
context.write(_key, one);
}//for j
}//for i
}
}//cleanup
}//PairMapper
这是我的减速机:
public static class PairReducer extends Reducer<PairKey, Iterable<IntWritable>, Text, IntWritable> {
public void reduce(PairKey key, Iterable<IntWritable> vals, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : vals) {
sum+= val.get();
}//for
IntWritable result = new IntWritable(sum);
context.write(new Text(key.toString()), result);
} //reduce
}
这是我的主要方法:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: moviepairs <in> <out>");
System.exit(2);
}
//CONFIGURE THE JOB
Job job = new Job(conf, "movie pairs");
job.setJarByClass(MoviePairs.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
//map-reduce classes
job.setMapperClass(PairMapper.class);
job.setCombinerClass(PairReducer.class);
job.setReducerClass(PairReducer.class);
//key-val classes
job.setMapOutputKeyClass(PairKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 :1);
}
我希望在我的减速机里得到这个:
pair <1,2>: [1,1,1]
相反,减速机似乎并不理解成对的相等性。而是输出:
pair<1,2>: [1]
pair<1,2>: [1]
pair<1,2>: [1]
不知道我错过了什么。如您所见,我已经尝试了几种方法,比如添加一个定制的排序器(我认为我不需要它,并使用一个分组比较器、定制分区器),但我认为简单地重写hashcode/equals应该可以解决这个问题(不确定)。我在网上找到的所有例子似乎都遵循这一点,它们似乎都很有效。
1条答案
按热度按时间ryhaxcpt1#
像往常一样,对于那些初学者的问题,这个问题是完全不相关的。我把减速机接口搞砸了。而不是
<KEYIN, VALIN...>
我在做什么<KEYIN, ITERABLE<VALIN>....>