reducer的iterable值在JavaMapReduce中似乎不一致

fkaflof6  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(403)

在reduce函数中有以下代码。当我尝试使用collectionutils.addall进行浅层复制时,复制失败;所有项都将引用最后一个项,而不是迭代器中的其他项。
以下是我的代码:

public void reduce(Text key, Iterable<ArrayListWritable<Writable>> values, Context context)
    throws IOException, InterruptedException {
    ArrayList<ArrayListWritable<Writable>> listOfWordPairs = new ArrayList<ArrayListWritable<Writable>>();

    // CollectionUtils.addAll(listOfWordPairs, values.iterator());
    // listOfWordPairs seems to all be the last item in the iterator

    Iterator<ArrayListWritable<Writable>> iter = values.iterator();

    // Manually do the copy
    while (iter.hasNext()) {
        // listOfWordPairs.add(iter.next()); 
        //Same behaviour as CollectionUtils.addAll()

        listOfWordPairs.add(new ArrayListWritable<Writable>(iter.next())); 
        //Only working way to do it -> deep copy :(
        }
    }

有人知道为什么会这样吗?我可以看出,如果mr用这种方式实现的话,它可以节省相当大的内存,但是这里似乎有一些魔法在发生。我是新来的,所以希望这个问题不要太愚蠢。。。
这是我给感兴趣的人的Map代码

@Override
        public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
          Map<String, HMapStFW> stripes = new HashMap<>();

          List<String> tokens = Tokenizer.tokenize(value.toString());

          if (tokens.size() < 2) return;
          context.getCounter(StripesPmiEnums.TOTALENTRIES).increment(tokens.size());

          for (int i = 0; i < tokens.size() && i<40; i++) {
            for (int j = 0;j<tokens.size() && j<40;j++){
                if (j == i)
                    continue;
                //Make Stripe if doesn't exist
                if (!stripes.containsKey(tokens.get(i))){
                    HMapStFW newStripe = new HMapStFW();
                    stripes.put(tokens.get(i), newStripe);
                }

                HMapStFW stripe = stripes.get(tokens.get(i));
                if (stripe.containsKey(tokens.get(j))){
                    stripe.put(tokens.get(j), stripe.get(tokens.get(j))+1.0f);
                }else{
                    stripe.put(tokens.get(j), 1.0f);
                }
            }
          }

          for (String word1 : stripes.keySet()) {
            TEXT.set(word1);
            context.write(TEXT, stripes.get(word1));
          }
        }

这里还提供了arraylistwritablehttps://github.com/lintool/tools/blob/master/lintools-datatypes/src/main/java/tl/lin/data/array/arraylistwritable.java

klsxnrf1

klsxnrf11#

这是因为迭代器在减速器中的工作方式不同。简而言之,您必须在迭代迭代器时克隆对象

while (iter.hasNext()) {
    //this is correct
    listOfWordPairs.add(new ArrayListWritable<Writable>(iter.next())); 

    }
}

看看下面的链接,它是非常好的解释
https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/

相关问题