spark-rdds与列表的连接操作

bprjcwpo  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(296)

我有以下RDD:

JavaPairRDD<List<String>, String> firstRDD = ...
firstRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent], Father

JavaPairRDD<List<String>, String> secondRDD = ...
secondRDD.foreach(row -> System.out.println(row._1() + ", " + row._2()));
// [Man, Parent, Father], Person

我想执行一个内部连接,如果左键在右键的子列表中(在前一个例子中, [Man, Parent][Man, Parent, Father] ).
有什么建议吗?
谢谢!

a5g8bdjr

a5g8bdjr1#

对于RDD(以及javapairrdds),连接操作只能检查完全匹配的键。
因此,我们必须将RDD转换为Dataframe:

public static Dataset<Row> toDataframe(SparkSession spark, JavaPairRDD<List<String>, String> rdd) {
    JavaRDD<Row> rowRDD1 = rdd.map(tuple -> {
        Seq<String> key = JavaConverters.asScalaIteratorConverter(tuple._1().iterator()).asScala().toSeq();
        return RowFactory.create(key, tuple._2());
    });
    StructType st = new StructType()
            .add(new StructField("key", DataTypes.createArrayType(DataTypes.StringType), true, new MetadataBuilder().build()))
            .add(new StructField("value", DataTypes.StringType, true, new MetadataBuilder().build()));
    return spark.createDataFrame(rowRDD1, st);
}

对于连接条件,我们需要一个udf来检查一个数组是否是另一个数组的一部分。如果元素的顺序不重要,也可以使用array\u intersect。

UserDefinedFunction contains = functions.udf((Seq<String> a, Seq<String> b) -> b.containsSlice(a), DataTypes.BooleanType);

把这两个元素放在一起

Dataset<Row> df1 = toDataframe(spark, firstRDD);
Dataset<Row> df2 = toDataframe(spark, secondRDD);
Dataset<Row> result = df1.join(df2,contains.apply(df1.col("key"), df2.col("key")));

输入数据

firstRDD        secondRDD
+------+-----+  +------------+-----+
|   key|value|  |         key|value|
+------+-----+  +------------+-----+
|[a, b]|    A|  |   [a, b, c]|    C|
|[b, a]|    B|  |[a, b, c, d]|    D|
+------+-----+  +------------+-----+

我们得到

+------+-----+------------+-----+
|   key|value|         key|value|
+------+-----+------------+-----+
|[a, b]|    A|   [a, b, c]|    C|
|[a, b]|    A|[a, b, c, d]|    D|
+------+-----+------------+-----+

请注意,使用自定义项作为联接条件可能不是最快的选择。

相关问题