pyspark-按组添加行

wfsdck30  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(441)

在pyspark2.2中,我基本上是按用户添加行。
如果我的主Dataframe看起来像:

main_list = [["a","bb",5], ["d","cc",10],["d","bb",11]]
main_pd = pd.DataFrame(main_list, columns = ['user',"group", 'value'])
main_df = spark.createDataFrame(main_pd)
main_df.show()

+----+-----+-----+
|user|group|value|
+----+-----+-----+
|   a|   bb|    5|
|   d|   cc|   10|
|   d|   bb|   11|
+----+-----+-----+

然后我有一个关键的Dataframe,我希望每个用户都有每个组的值
用户 d 组中有一行 bb 以及 cc . 我想要一个用户 a 拥有同样的东西。

key_list = [["bb",10],["cc",17]]
key_pd = pd.DataFrame(key_list, columns = ['group', 'value'])
key_df = spark.createDataFrame(key_pd)

main_df.join(key_df, ["group"], how ="outer").show()

但我的结果是:

+-----+----+-----+-----+
|group|user|value|value|
+-----+----+-----+-----+
|   cc|   d|   10|   17|
|   bb|   a|    5|   10|
|   bb|   d|   11|   10|
+-----+----+-----+-----+

以下是每个Dataframe的模式:

main_df.printSchema()
root
 |-- user: string (nullable = true)
 |-- group: string (nullable = true)
 |-- value: long (nullable = true)

key_df.printSchema()
root
 |-- group: string (nullable = true)
 |-- value: long (nullable = true)

基本上我希望结果是:

+-----+----+-----+-----+
|group|user|value|value|
+-----+----+-----+-----+
|   cc|   d|   10|   17|
|   bb|   a|    5|   10|
|   cc|   a| Null|   17|
|   bb|   d|   11|   10|
+-----+----+-----+-----+

我不认为完整的外部连接可以用 coalesce 所以我也尝试了 row_number/rank

gajydyqb

gajydyqb1#

使用 cross join ,然后使用 left joinmaind_df 生成缺少的行,然后 left join 键为的结果。

users = main_df.select("user").distinct()
groups = main_df.select("group").distinct()
user_group = users.crossJoin(groups)

all_combs = user_group.join(main_df, (main_df.user == user_group.user) & (main_df.group == user_group.group), "left").select(user_group.user,user_group.group,main_df.value)
all_combs.join(key_df, key_df.group == all_combs.group, "left").show()

相关问题