比较dataframes中的列值并返回与sparkjava8中的值不同的列名

zvokhttg  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(465)

我有两个Dataframe,每个Dataframe有230列,我想比较一个键列上的两个Dataframe,如果使用spark的java8中的列值有任何差异的话,就从这两个Dataframe中获取列名称和值。

id  Col_1 Col_2 Col_3 Col_4 Col_5
1    A     B      C     D     E
2    X     Y      Z     P     Q

id  Col_1 Col_2 Col_3 Col_4 Col_5
1    A     B6     C     D     E
2    X     Y      Z8    P     Q3

输出

id  Col_1 Col_2   Col_3 Col_4 Col_5
1   null  [B,B6]  null  null   null
2   null  null   [Z,Z8] null   [Q,Q3]

使用spark和java8

Df1.except(DF2);

StructType one = DF1.schema();

JavaPairRDD<String, Row> pair1 = DF1.toJavaRDD()
        .mapToPair(new PairFunction<Row, String, Row>() {
            public Tuple2<String, Row> call(Row row) {
                return new Tuple2<String, Row>(row.getString(0), row);
            }
        });

JavaPairRDD<String, Row> pair2 = DF2.toJavaRDD()
        .mapToPair(new PairFunction<Row, String, Row>() {
            public Tuple2<String, Row> call(Row row) {
                return new Tuple2<String, Row>(row.getString(0), row);
            }
        });

JavaPairRDD<String, Row> subs = pair1.subtractByKey(pair2);
JavaRDD<Row> rdd = subs.values();
Dataset<Row> diff = spark.createDataFrame(rdd, one);
diff.show();

请帮忙。

hfwmuf9z

hfwmuf9z1#

请找到下面的解决方案,我试图通过保持dataframes为dataframes来解决这个问题,您可以找到代码解释的内联注解。实际的解决方案从下面的第//行开始

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.apache.spark.sql.functions.*;

public class CompareDfs {

    public static void main(String[] args) {
        SparkSession spark = Constant.getSparkSess();

        List<String> list1 = new ArrayList<>();
        list1.add("1,A,B,C,D,E");
        list1.add("2,X,Y,Z,P,Q");

        List<String> list2 = new ArrayList<>();
        list2.add("1,A,B6,C,D,E");
        list2.add("2,X,Y,Z8,P,Q3");

        Dataset<Row> df = spark.createDataset(list1, Encoders.STRING()).toDF().selectExpr("split(value, ',')[0] as id",
                "split(value, ',')[1] as Col_1",
                "split(value, ',')[2] as Col_2",
                "split(value, ',')[3] as Col_3",
                "split(value, ',')[4] as Col_4",
                "split(value, ',')[5] as Col_5");
//        df.printSchema();
//        df.show();
        // Convert
        Dataset<Row> df1 = spark.createDataset(list2, Encoders.STRING()).toDF().selectExpr("split(value, ',')[0] as id",
                "split(value, ',')[1] as Col_1",
                "split(value, ',')[2] as Col_2",
                "split(value, ',')[3] as Col_3",
                "split(value, ',')[4] as Col_4",
                "split(value, ',')[5] as Col_5");
//        df1.printSchema();
//        df1.show();

        //Below is the solution
        List<String> columns = Arrays.asList("Col_1", "Col_2", "Col_3", "Col_4", "Col_5"); // List of columns to merge

        // inner join the 2 dataframes
        Dataset<Row> joinedDf = df.join(df1).where(df.col("id").equalTo(df1.col("id")));

        // Iterate throgh the columns
        for (String column : columns) {
            joinedDf = joinedDf
                    .withColumn(column + "_temp",
                            when(df.col(column).equalTo(df1.col(column)), null) // When and otherwise clause for column to array/nul transformation
                                    .otherwise(split(concat_ws(",", df.col(column), df1.col(column)), ",")))
                    .drop(df.col(column)) // Drop column from 1st dataframe
                    .drop(df1.col(column)) // Drop column from 2nd dataframe
                    .withColumnRenamed(column + "_temp", column); // Rename column to the result column name
        }

//                .withColumn("Col_2_t",when(df.col("Col_2").equalTo(df1.col("Col_2")), null ).otherwise(split(concat_ws(",",df.col("Col_2"),df1.col("Col_2")),",")))
        joinedDf.show();

    }
}
kxkpmulp

kxkpmulp2#

我试图用Dataframe方法解决这个问题-

List<Column> cols = Arrays.stream(df1.columns())
                .map(c -> {
                    if (c.equalsIgnoreCase("id"))
                        return col("a.id");
                    else
                        return array(toScalaSeq(Arrays.asList(col("a."+c), col("b."+c))).toBuffer()).as(c);
                }).collect(Collectors.toList());
        Dataset<Row> processedDf = df1.as("a").join(df2.as("b"), df1.col("id").equalTo(df2.col("id")))
                .select(toScalaSeq(cols).toBuffer());

        List<Column> cols1 =  Arrays.stream(df1.columns())
                .map(f -> {
                    if (f.equalsIgnoreCase("id"))
                        return expr(f);
                    else
                        return expr("if(size(array_distinct(" + f + "))==1, NULL, " + f + " ) as " + f);
                }).collect(Collectors.toList());

        processedDf.select(toScalaSeq(cols1).toBuffer())
                .show(false);
        /**
         * +---+-----+-------+-------+-----+-------+
         * |id |Col_1|Col_2  |Col_3  |Col_4|Col_5  |
         * +---+-----+-------+-------+-----+-------+
         * |1  |null |[B, B6]|null   |null |null   |
         * |2  |null |null   |[Z, Z8]|null |[Q, Q3]|
         * +---+-----+-------+-------+-----+-------+
         */

请参考此处的完整代码-要点

相关问题