pyspark中的任何函数是否生成相同的唯一值集(如 numpy.arange() )可以用作两个Dataframe之间的公共列,以后可以用于连接?
numpy.arange()
wgxvkvu91#
下面是我可以想到的使用行的所有列值生成唯一值的方法-
String data = " firstname| lastname| age\n" + " John | Doe | 21\n" + " John. | Doe. | 21\n" + " Mary. | William. | 30"; List<String> list1 = Arrays.stream(data.split(System.lineSeparator())) .map(s -> Arrays.stream(s.split("\\|")) .map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", "")) .collect(Collectors.joining(",")) ) .collect(Collectors.toList()); Dataset<Row> df2 = spark.read() .option("header", true) .option("inferSchema", true) .option("sep", ",") .csv(spark.createDataset(list1, Encoders.STRING())); df2.show(false); df2.printSchema(); /** * +---------+--------+---+ * |firstname|lastname|age| * +---------+--------+---+ * |John |Doe |21 | * |John. |Doe. |21 | * |Mary. |William.|30 | * +---------+--------+---+ * * root * |-- firstname: string (nullable = true) * |-- lastname: string (nullable = true) * |-- age: integer (nullable = true) */
List<Column> allCols = Arrays.stream(df2.columns()).map(functions::col).collect(Collectors.toList());
维基百科页面给出了碰撞可能性的估计。如果你计算一下这些数字,你会发现地球上生产的所有硬盘都无法容纳足够的1mb文件,甚至无法为sha-256提供0.01%的冲突可能性。基本上,你可以忽略这种可能性。请注意-这种方法将在 string 格式
string
df2.withColumn("stringId", sha2(concat_ws(":", toScalaSeq(allCols)), 256)) .show(false); /** * run-1 * +---------+--------+---+----------------------------------------------------------------+ * |firstname|lastname|age|stringId | * +---------+--------+---+----------------------------------------------------------------+ * |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30| * |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9| * |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88| * +---------+--------+---+----------------------------------------------------------------+ * run-2 * +---------+--------+---+----------------------------------------------------------------+ * |firstname|lastname|age|stringId | * +---------+--------+---+----------------------------------------------------------------+ * |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30| * |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9| * |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88| * +---------+--------+---+----------------------------------------------------------------+ * run-3 * +---------+--------+---+----------------------------------------------------------------+ * |firstname|lastname|age|stringId | * +---------+--------+---+----------------------------------------------------------------+ * |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30| * |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9| * |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88| * +---------+--------+---+----------------------------------------------------------------+ */
由于在使用大数据集时没有partitionby子句,性能会下降请注意-这种方法将在 number 格式
number
df2.withColumn("number", row_number().over(Window.orderBy(toScalaSeq(allCols)))) .show(false); /** * run-1 * +---------+--------+---+------+ * |firstname|lastname|age|number| * +---------+--------+---+------+ * |John |Doe |21 |1 | * |John. |Doe. |21 |2 | * |Mary. |William.|30 |3 | * +---------+--------+---+------+ * run-2 * +---------+--------+---+------+ * |firstname|lastname|age|number| * +---------+--------+---+------+ * |John |Doe |21 |1 | * |John. |Doe. |21 |2 | * |Mary. |William.|30 |3 | * +---------+--------+---+------+ * run-3 * +---------+--------+---+------+ * |firstname|lastname|age|number| * +---------+--------+---+------+ * |John |Doe |21 |1 | * |John. |Doe. |21 |2 | * |Mary. |William.|30 |3 | * +---------+--------+---+------+ */
UserDefinedFunction id_udf = udf( (String s) -> UUID.nameUUIDFromBytes( s.getBytes(StandardCharsets.UTF_8) ).toString(), DataTypes.StringType); df2.withColumn("stringId", id_udf.apply(concat_ws(":", toScalaSeq(allCols)))) .show(false); /** * run-1 * +---------+--------+---+------------------------------------+ * |firstname|lastname|age|stringId | * +---------+--------+---+------------------------------------+ * |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb| * |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17| * |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf| * +---------+--------+---+------------------------------------+ * run-2 * +---------+--------+---+------------------------------------+ * |firstname|lastname|age|stringId | * +---------+--------+---+------------------------------------+ * |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb| * |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17| * |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf| * +---------+--------+---+------------------------------------+ * run-3 * +---------+--------+---+------------------------------------+ * |firstname|lastname|age|stringId | * +---------+--------+---+------------------------------------+ * |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb| * |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17| * |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf| * +---------+--------+---+------------------------------------+ */
关于-行可能有重复,重复的行应该生成不同的id,对于后续的运行,id应该是相同的。所有这些方法都将在运行中创建一致且唯一的ID请保持下面的方法方便转换-
<T> Buffer<T> toScalaSeq(List<T> list) { return JavaConversions.asScalaBuffer(list); }
1条答案
按热度按时间wgxvkvu91#
下面是我可以想到的使用行的所有列值生成唯一值的方法-
加载测试数据
创建数据框中所有列的列表
方法-1。计算所有列的sha2
维基百科页面给出了碰撞可能性的估计。如果你计算一下这些数字,你会发现地球上生产的所有硬盘都无法容纳足够的1mb文件,甚至无法为sha-256提供0.01%的冲突可能性。基本上,你可以忽略这种可能性。请注意-
这种方法将在
string
格式方法2:使用行数函数
由于在使用大数据集时没有partitionby子句,性能会下降
请注意-
这种方法将在
number
格式方法3:使用uuid.nameuidfrombytes
关于-
行可能有重复,重复的行应该生成不同的id,对于后续的运行,id应该是相同的。
所有这些方法都将在运行中创建一致且唯一的ID
请保持下面的方法方便转换-