有一个输入Dataframe df
(有10列,col1-col10)其中我添加了一个新列 uuid
通过使用下面的自定义项并转换成另一个Dataframe newdf
.
下一个,从 newdf
datafame,我正在创建两个独立的Dataframe df1
(uuid,col1-col5)和 df2
(uuid,col6-col10)仅包含上述列。
问题来了,我要 uuid
列对于两个列中的行应相同且唯一 df1
& df2
Dataframe。
因为spark使用惰性评估,所以在我编写时它运行udf df1
以及 df2
因为它给了不同的 uuid
两行中每行的值 df1
以及 df2
Dataframe。
我现在所遵循的解决方案是 newdf
先在temp路径读取Dataframe,然后再读回。但是这种逻辑对于大量的数据是不好的。
下面是一段代码片段:
df.show(false)
+------+------+------+------+------+------+------+------+------+-------+
| col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8 | col9 | col10 |
+------+------+------+------+------+------+------+------+------+-------+
| A1 | A2 | A3 | A4 | A5 | A6 | A7 | A8 | A9 | A10 |
| B1 | B2 | B3 | B4 | B5 | B6 | B7 | B8 | B9 | B10 |
| C1 | C2 | C3 | C4 | C5 | C6 | C7 | C8 | C9 | C10 |
+------+------+------+------+------+-------+------+------+------+------+
val uuid = udf(() => java.util.UUID.randomUUID().toString)
val newdf = df.withColumn("uuid", uuid())
val df1 = newdf.select(uuid, col1, col2, col3, col4, col5)
val df2 = newdf.select(uuid, col6, col7, col8, col9, col10)
df1.write.format("parquet").save("/df1/")
df2.write.format("parquet").save("/df2/")
df1.show()
+-----------------+------+------+------+------+------+
| uuid | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1 | A2 | A3 | A4 | A5 |
|1dbcecf-1304-4a4e| B1 | B2 | B3 | B4 | B5 |
|1vbdecf-8406-4a4e| C1 | C2 | C3 | C4 | C5 |
+-----------------+------+------+------+------+------+
df2.show()
+-----------------+------+------+------+------+------+
| uuid | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|2aodecf-3303-6n5e| A6 | A7 | A8 | A9 | A10 |
|2docecf-6305-6n5e| B6 | B7 | B8 | B9 | B10 |
|2vodecf-1406-6n5e| C6 | C7 | C8 | C9 | C10 |
+-----------------+------+------+------+------+------+
预期输出:跨行的两个Dataframe中的uuid相同
df1.show()
+-----------------+------+------+------+------+------+
| uuid | col1 | col2 | col3 | col4 | col5 |
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A1 | A2 | A3 | A4 | A5 |
|1dbcecf-1304-4a4e| B1 | B2 | B3 | B4 | B5 |
|1vbdecf-8406-4a4e| C1 | C2 | C3 | C4 | C5 |
+-----------------+------+------+------+------+------+
df2.show()
+-----------------+------+------+------+------+------+
| uuid | col6 | col7 | col8 | col9 | col10|
+-----------------+------+------+------+------+------+
|1abdecf-8303-4a4e| A6 | A7 | A8 | A9 | A10 |
|1dbcecf-1304-4a4e| B6 | B7 | B8 | B9 | B10 |
|1vbdecf-8406-4a4e| C6 | C7 | C8 | C9 | C10 |
+-----------------+------+------+------+------+------+
请提供克服这个问题的最佳方法。
1条答案
按热度按时间omtl5h9j1#
试试这个-
解决方案在spark scala api中
使用
UUID.nameUUIDFromBytes
作为自定义项加载提供的测试数据
创建相同的uuid