是否有PySpark函数可以合并同一列中具有相同id的行的数据?

xlpyo6sf  于 2023-02-18  发布在  Spark
关注(0)|答案(2)|浏览(166)

我有以下 Dataframe :

+---+---+
| A | B |
+---+---+
| 1 | a |
| 1 | b |
| 1 | c |
| 2 | f |
| 2 | g |
| 3 | j |
+---+---+

我需要它是df/rdd格式

(1, [a, b, c])
(2, [f, g])
(3, [j])

我是Spark的新手,想知道是否可以通过单个函数执行此操作
我尝试使用flatmap,但我认为我没有正确使用它

eblbsuwk

eblbsuwk1#

您可以按“A”分组,然后使用聚合函数,例如collect_set或collect_array

import pyspark.sql.functions as F

df = [
    {"A": 1, "B": "a"},
    {"A": 1, "B": "b"},
    {"A": 1, "B": "c"},
    {"A": 2, "B": "f"},
    {"A": 2, "B": "g"},
    {"A": 3, "B": "j"}
]

df = spark.createDataFrame(df)
df.groupBy("A").agg(F.collect_set(F.col("B"))).show()

产出

+---+--------------+
|  A|collect_set(B)|
+---+--------------+
|  1|     [c, b, a]|
|  2|        [g, f]|
|  3|           [j]|
+---+--------------+
0lvr5msh

0lvr5msh2#

第一步,创建示例数据。

#
# 1 - Create sample dataframe + view
#

# array of tuples - data
dat1 = [
    (1, "a"),
    (1, "b"),
    (1, "c"),
    (2, "f"),
    (2, "g"),
    (3, "j")  
]

# array of names - columns
col1 = ["A", "B"]

# make data frame
df1 = spark.createDataFrame(data=dat1, schema=col1)

# make temp hive view
df1.createOrReplaceTempView("sample_data")

第二步,摆弄临时table。

%sql
select * from sample_data

%sql
select A, collect_list(B) as B_LIST from sample_data group by A

最后一步,编写代码来执行Spark SQL以创建所需的 Dataframe 。

df2 = spark.sql("select A, collect_list(B) as B_LIST from sample_data group by A")
display(df2)

总之,你可以使用dataframe方法来创建相同的输出,但是Spark SQL看起来更干净,更有意义。

相关问题