pyspark中的函数是否生成相同的连续唯一值集?

dm7nw8vv  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(333)

pyspark中的任何函数是否生成相同的唯一值集(如 numpy.arange() )可以用作两个Dataframe之间的公共列,以后可以用于连接?

wgxvkvu9

wgxvkvu91#

下面是我可以想到的使用行的所有列值生成唯一值的方法-

加载测试数据

String data = " firstname|  lastname|  age\n" +
                " John      | Doe      | 21\n" +
                " John.     | Doe.     | 21\n" +
                " Mary.     | William. | 30";

        List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
                .map(s -> Arrays.stream(s.split("\\|"))
                        .map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
                        .collect(Collectors.joining(","))
                )
                .collect(Collectors.toList());

        Dataset<Row> df2 = spark.read()
                .option("header", true)
                .option("inferSchema", true)
                .option("sep", ",")
                .csv(spark.createDataset(list1, Encoders.STRING()));

        df2.show(false);
        df2.printSchema();
        /**
         * +---------+--------+---+
         * |firstname|lastname|age|
         * +---------+--------+---+
         * |John     |Doe     |21 |
         * |John.    |Doe.    |21 |
         * |Mary.    |William.|30 |
         * +---------+--------+---+
         *
         * root
         *  |-- firstname: string (nullable = true)
         *  |-- lastname: string (nullable = true)
         *  |-- age: integer (nullable = true)
         */

创建数据框中所有列的列表

List<Column> allCols = Arrays.stream(df2.columns()).map(functions::col).collect(Collectors.toList());

方法-1。计算所有列的sha2

维基百科页面给出了碰撞可能性的估计。如果你计算一下这些数字,你会发现地球上生产的所有硬盘都无法容纳足够的1mb文件,甚至无法为sha-256提供0.01%的冲突可能性。基本上,你可以忽略这种可能性。请注意-
这种方法将在 string 格式

df2.withColumn("stringId", sha2(concat_ws(":", toScalaSeq(allCols)), 256))
                .show(false);
        /**
         * run-1
         * +---------+--------+---+----------------------------------------------------------------+
         * |firstname|lastname|age|stringId                                                        |
         * +---------+--------+---+----------------------------------------------------------------+
         * |John     |Doe     |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
         * |John.    |Doe.    |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
         * |Mary.    |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
         * +---------+--------+---+----------------------------------------------------------------+
         * run-2
         * +---------+--------+---+----------------------------------------------------------------+
         * |firstname|lastname|age|stringId                                                        |
         * +---------+--------+---+----------------------------------------------------------------+
         * |John     |Doe     |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
         * |John.    |Doe.    |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
         * |Mary.    |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
         * +---------+--------+---+----------------------------------------------------------------+
         * run-3
         * +---------+--------+---+----------------------------------------------------------------+
         * |firstname|lastname|age|stringId                                                        |
         * +---------+--------+---+----------------------------------------------------------------+
         * |John     |Doe     |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
         * |John.    |Doe.    |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
         * |Mary.    |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
         * +---------+--------+---+----------------------------------------------------------------+
         */

方法2:使用行数函数

由于在使用大数据集时没有partitionby子句,性能会下降
请注意-
这种方法将在 number 格式

df2.withColumn("number", row_number().over(Window.orderBy(toScalaSeq(allCols))))
                .show(false);
        /**
         * run-1
         * +---------+--------+---+------+
         * |firstname|lastname|age|number|
         * +---------+--------+---+------+
         * |John     |Doe     |21 |1     |
         * |John.    |Doe.    |21 |2     |
         * |Mary.    |William.|30 |3     |
         * +---------+--------+---+------+
         * run-2
         * +---------+--------+---+------+
         * |firstname|lastname|age|number|
         * +---------+--------+---+------+
         * |John     |Doe     |21 |1     |
         * |John.    |Doe.    |21 |2     |
         * |Mary.    |William.|30 |3     |
         * +---------+--------+---+------+
         * run-3
         * +---------+--------+---+------+
         * |firstname|lastname|age|number|
         * +---------+--------+---+------+
         * |John     |Doe     |21 |1     |
         * |John.    |Doe.    |21 |2     |
         * |Mary.    |William.|30 |3     |
         * +---------+--------+---+------+
         */

方法3:使用uuid.nameuidfrombytes

UserDefinedFunction id_udf = udf( (String s) ->
                UUID.nameUUIDFromBytes(
                        s.getBytes(StandardCharsets.UTF_8)
                ).toString(), DataTypes.StringType);

        df2.withColumn("stringId", id_udf.apply(concat_ws(":", toScalaSeq(allCols))))
        .show(false);
        /**
         * run-1
         * +---------+--------+---+------------------------------------+
         * |firstname|lastname|age|stringId                            |
         * +---------+--------+---+------------------------------------+
         * |John     |Doe     |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
         * |John.    |Doe.    |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
         * |Mary.    |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
         * +---------+--------+---+------------------------------------+
         * run-2
         * +---------+--------+---+------------------------------------+
         * |firstname|lastname|age|stringId                            |
         * +---------+--------+---+------------------------------------+
         * |John     |Doe     |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
         * |John.    |Doe.    |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
         * |Mary.    |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
         * +---------+--------+---+------------------------------------+
         * run-3
         * +---------+--------+---+------------------------------------+
         * |firstname|lastname|age|stringId                            |
         * +---------+--------+---+------------------------------------+
         * |John     |Doe     |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
         * |John.    |Doe.    |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
         * |Mary.    |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
         * +---------+--------+---+------------------------------------+
         */

关于-
行可能有重复,重复的行应该生成不同的id,对于后续的运行,id应该是相同的。
所有这些方法都将在运行中创建一致且唯一的ID
请保持下面的方法方便转换-

<T> Buffer<T> toScalaSeq(List<T> list) {
        return JavaConversions.asScalaBuffer(list);
    }

相关问题