sparkflatmap/reduce:如何缩放和避免内存不足?

9cbw7uwe  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(370)

我正在将一些map reduce代码迁移到spark中,并且在构造函数中要返回的iterable时遇到问题。在mr代码中,我有一个按键分组的reduce函数,然后(使用multipleoutputs)将迭代这些值,并对某些代码使用write(在多个输出中,但这并不重要)(简化):

reduce(Key key, Iterable<Text> values) {
    // ... some code
    for (Text xml: values) {
        multipleOutputs.write(key, val, directory);
    }
}

然而,在spark中,我已经将一个map和这个reduce转换成一个序列:maptopair->groupbykey->flatmap,正如推荐的那样。。。在某本书里。
maptopair基本上通过functionmap添加一个键,functionmap基于记录上的一些值为该记录创建一个键。有时一把钥匙可能有很高的基数。

JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() { 
    public Tuple2<Key, String> call(String value) {
        //... 
        return functionMap.call(value);
    }
});

将rddpaired应用于rdd.groupbykey()以获取rdd以提供flatmap函数:

JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.groupByKey();

分组后,执行reduce的flatmap调用。在这里,操作是一种转换:

public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
    // some code...
    List<String> out = new ArrayList<String>();
    if (someConditionOnKey) { 
        // do a logic
        Grouper grouper = new Grouper();
        for (String xml : keyValue._2()) {
            // group in a separate class
            grouper.add(xml);
        }
        // operation is now performed on the whole group
        out.add(operation(grouper));
    } else {
        for (String xml : keyValue._2()) {
            out.add(operation(xml));
        }
        return out;
    }
}

很好用。。。没有太多记录的密钥。实际上,当一个有很多值的键进入reduce上的“else”时,它就会中断outofmemory。
注:我已经包括了“如果”部分来解释我想产生的逻辑,但失败发生在输入“否则”。。。因为当数据进入“else”时,通常意味着根据数据的性质,会有更多的值。
很明显,必须将所有分组的值都保存在“out”列表中,如果一个键有数百万条记录,它就不会进行缩放,因为它会将它们保存在内存中。我已经到了oom发生的时候了(是的,它是在执行上面的“操作”时,它要求内存,而没有给出内存)。不过,这并不是一个非常昂贵的内存操作)。
为了扩大规模,有没有办法避免这种情况?或者通过使用其他指令复制行为,以更具可伸缩性的方式达到相同的输出,或者能够手动触发用于合并的值(就像我以前对mr所做的那样)。。。

lymgl2op

lymgl2op1#

在室内做条件是低效的 flatMap 操作。您应该检查外部条件以创建两个不同的RDD并分别处理它们。

rddPaired.cache();

// groupFilterFunc will filter which items need grouping
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.filter(groupFilterFunc).groupByKey();
// processGroupedValuesFunction should call `operation` on group of all values with the same key and return the result
rddGrouped.mapValues(processGroupedValuesFunction);

// nogroupFilterFunc will filter which items don't need grouping
JavaPairRDD<Key, Iterable<String>> rddNoGrouped = rddPaired.filter(nogroupFilterFunc);
// processNoGroupedValuesFunction2 should call `operation` on a single value and return the result
rddNoGrouped.mapValues(processNoGroupedValuesFunction2);

相关问题