如何在两个单独的Dataframe中保持相同的uuid?

jucafojl  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(356)

有一个输入Dataframe df (有10列,col1-col10)其中我添加了一个新列 uuid 通过使用下面的自定义项并转换成另一个Dataframe newdf .
下一个,从 newdf datafame,我正在创建两个独立的Dataframe df1 (uuid,col1-col5)和 df2 (uuid,col6-col10)仅包含上述列。
问题来了,我要 uuid 列对于两个列中的行应相同且唯一 df1 & df2 Dataframe。
因为spark使用惰性评估,所以在我编写时它运行udf df1 以及 df2 因为它给了不同的 uuid 两行中每行的值 df1 以及 df2 Dataframe。
我现在所遵循的解决方案是 newdf 先在temp路径读取Dataframe,然后再读回。但是这种逻辑对于大量的数据是不好的。
下面是一段代码片段:

df.show(false)
+------+------+------+------+------+------+------+------+------+-------+
| col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 |
+------+------+------+------+------+------+------+------+------+-------+
| A1   | A2   | A3   | A4   | A5   | A6   | A7   | A8   | A9   |  A10  |
| B1   | B2   | B3   | B4   | B5   | B6   | B7   | B8   | B9   |  B10  |
| C1   | C2   | C3   | C4   | C5   | C6   | C7   | C8   | C9   |  C10  |
+------+------+------+------+------+-------+------+------+------+------+

val uuid = udf(() => java.util.UUID.randomUUID().toString)
val newdf = df.withColumn("uuid", uuid())

val df1 = newdf.select(uuid, col1, col2, col3, col4, col5)
val df2 = newdf.select(uuid, col6, col7, col8, col9, col10)

df1.write.format("parquet").save("/df1/")
df2.write.format("parquet").save("/df2/")

df1.show()
+-----------------+------+------+------+------+------+
|     uuid        | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1   | A2   | A3   | A4   | A5   |
|1dbcecf-1304-4a4e| B1   | B2   | B3   | B4   | B5   |
|1vbdecf-8406-4a4e| C1   | C2   | C3   | C4   | C5   |
+-----------------+------+------+------+------+------+

df2.show()
+-----------------+------+------+------+------+------+
|     uuid        | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|2aodecf-3303-6n5e| A6   | A7   | A8   | A9   | A10  |
|2docecf-6305-6n5e| B6   | B7   | B8   | B9   | B10  |
|2vodecf-1406-6n5e| C6   | C7   | C8   | C9   | C10  |
+-----------------+------+------+------+------+------+

预期输出:跨行的两个Dataframe中的uuid相同

df1.show()
+-----------------+------+------+------+------+------+
|     uuid        | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1   | A2   | A3   | A4   | A5   |
|1dbcecf-1304-4a4e| B1   | B2   | B3   | B4   | B5   |
|1vbdecf-8406-4a4e| C1   | C2   | C3   | C4   | C5   |
+-----------------+------+------+------+------+------+

df2.show()
+-----------------+------+------+------+------+------+
|     uuid        | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A6   | A7   | A8   | A9   | A10  |
|1dbcecf-1304-4a4e| B6   | B7   | B8   | B9   | B10  |
|1vbdecf-8406-4a4e| C6   | C7   | C8   | C9   | C10  |
+-----------------+------+------+------+------+------+

请提供克服这个问题的最佳方法。

omtl5h9j

omtl5h9j1#

试试这个-
解决方案在spark scala api中
使用 UUID.nameUUIDFromBytes 作为自定义项

加载提供的测试数据

df.show(false)
    df.printSchema()
    /**
      * +----+----+----+----+----+----+----+----+----+-----+
      * |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|
      * +----+----+----+----+----+----+----+----+----+-----+
      * |A1  |A2  |A3  |A4  |A5  |A6  |A7  |A8  |A9  |A10  |
      * |B1  |B2  |B3  |B4  |B5  |B6  |B7  |B8  |B9  |B10  |
      * |C1  |C2  |C3  |C4  |C5  |C6  |C7  |C8  |C9  |C10  |
      * +----+----+----+----+----+----+----+----+----+-----+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: string (nullable = true)
      * |-- col3: string (nullable = true)
      * |-- col4: string (nullable = true)
      * |-- col5: string (nullable = true)
      * |-- col6: string (nullable = true)
      * |-- col7: string (nullable = true)
      * |-- col8: string (nullable = true)
      * |-- col9: string (nullable = true)
      * |-- col10: string (nullable = true)
      */

创建相同的uuid

val uuid = udf((s: String) => UUID.nameUUIDFromBytes(s.getBytes(StandardCharsets.UTF_8)).toString)
    val newdf = df.withColumn("uuid", uuid(concat_ws(":", df.columns.map(col): _*)))

    val df1 = newdf.select("uuid", "col1", "col2", "col3", "col4", "col5")
    val df2 = newdf.select("uuid", "col6", "col7", "col8", "col9", "col10")
    df1.show(false)
    /**
      * +------------------------------------+----+----+----+----+----+
      * |uuid                                |col1|col2|col3|col4|col5|
      * +------------------------------------+----+----+----+----+----+
      * |0c26ece0-708a-3105-896f-e70d18b67766|A1  |A2  |A3  |A4  |A5  |
      * |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B1  |B2  |B3  |B4  |B5  |
      * |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C1  |C2  |C3  |C4  |C5  |
      * +------------------------------------+----+----+----+----+----+
      */

    df2.show(false)
    /**
      * +------------------------------------+----+----+----+----+-----+
      * |uuid                                |col6|col7|col8|col9|col10|
      * +------------------------------------+----+----+----+----+-----+
      * |0c26ece0-708a-3105-896f-e70d18b67766|A6  |A7  |A8  |A9  |A10  |
      * |0e19058c-3c14-3d2f-8c71-b7308f63b0d6|B6  |B7  |B8  |B9  |B10  |
      * |eef9969b-3650-31f5-b877-d5e86ce7b1b7|C6  |C7  |C8  |C9  |C10  |
      * +------------------------------------+----+----+----+----+-----+
      */

相关问题