java—使用ApacheSpark高效地获取多对上的笛卡尔积

zz2j4svz  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(504)

旧语境

我在我的数据集上做一个计算,要求每个元素都与自身相结合,即通过执行 mapToPairJavaPairRDD<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>> 类型。此组合使用 cartesian 依据:

JavaPairRDD<String, List<Double>> keyvals;
...
JavaPairRDD<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>> combined = keyvals.cartesian(keyvals).filter(tpl -> !tpl._1._1.equals(tpl._2._1));

combined.mapToPair(tpl -> {
   Tuple2<String, String> ids = new Tuple2<>(tpl._1._1, tpl._2._1);

    double result = calculateResult(tpl._1._2, tpl._2._2);

    return new Tuple2<>(ids, result);
}).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");

新上下文

我现在扩展了这个方法 calculateResult 接受三个 List<Double> 类型(而不是上面示例中的两个)。这要求数据集与自身结合两次。不过,在这里, cartesian 似乎不够。
因此,我的问题是:如何组合我的数据( keyvals )两次,基本上产生了匹配的东西 JavaPairRDD<Tuple2<...>, Tuple2<...>, Tuple2<...>> (伪代码)。
我的目标是调用这个方法 calculateResult(List<Double> s1, List<Double> s2 ,List<Double> s3) 在每个交叉组合对上。我想我可能没有采取正确的方法,试图用笛卡尔展开我上面给出的例子,但我不知道什么是正确的步骤。
不幸的是,我只能使用SparkJava2.4.x。

laawzig2

laawzig21#

希望对你有帮助
我已经添加了代码内联注解来描述我正在尝试做的事情 List 而不是 Tuple3 如果你需要更多的表演 catesian joins ```
JavaPairRDD<List, List<List>> result =
keyvals.cartesian(keyvals)
.filter(tpl -> !tpl._1._1.equals(tpl._2._1))
//Perform 3rd cartesian
.cartesian(keyvals)
//Skip the common ids from 1st and 3rd keyvals
.filter(tpl -> !tpl._1._1._1.equals(tpl._2._1))
//Map the result top Pair of Ids:List and Values:List<List>
.mapToPair((PairFunction<Tuple2<Tuple2<Tuple2<String, List>, Tuple2<String, List>>, Tuple2<String, List>>, List, List<List>>) tuple2Tuple2Tuple2 -> {

                //Combine Ids to single List
                List<String> keys = new ArrayList<>();
                keys.add(tuple2Tuple2Tuple2._1._1._1);
                keys.add(tuple2Tuple2Tuple2._1._2._1);
                keys.add(tuple2Tuple2Tuple2._2._1);

                //Combine values to single List
                List<List<Double>> values = new ArrayList<>();
                values.add(tuple2Tuple2Tuple2._1._1._2);
                values.add(tuple2Tuple2Tuple2._1._2._2);
                values.add(tuple2Tuple2Tuple2._2._2);

                //Return tuple of List of Ids and List of Values which are of fixed size 3
                return new Tuple2<>(keys,values);
            });

    result.mapToPair(tpl -> {
        Tuple3<String, String,String> ids = new Tuple3<>(tpl._1.get(0), tpl._1.get(1), tpl._1.get(2));
        double result = calculateResult(tpl._2.get(0), tpl._2.get(1),tpl._2.get(2));
        return new Tuple2<>(ids, result);
    }).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");
注意:将spark java代码迁移到spark javaDataframe将缩短代码并降低复杂性

相关问题