我有以下 Dataframe :
+---+---+ | A | B | +---+---+ | 1 | a | | 1 | b | | 1 | c | | 2 | f | | 2 | g | | 3 | j | +---+---+
我需要它是df/rdd格式
(1, [a, b, c]) (2, [f, g]) (3, [j])
我是Spark的新手,想知道是否可以通过单个函数执行此操作我尝试使用flatmap,但我认为我没有正确使用它
eblbsuwk1#
您可以按“A”分组,然后使用聚合函数,例如collect_set或collect_array
import pyspark.sql.functions as F df = [ {"A": 1, "B": "a"}, {"A": 1, "B": "b"}, {"A": 1, "B": "c"}, {"A": 2, "B": "f"}, {"A": 2, "B": "g"}, {"A": 3, "B": "j"} ] df = spark.createDataFrame(df) df.groupBy("A").agg(F.collect_set(F.col("B"))).show()
产出
+---+--------------+ | A|collect_set(B)| +---+--------------+ | 1| [c, b, a]| | 2| [g, f]| | 3| [j]| +---+--------------+
0lvr5msh2#
第一步,创建示例数据。
# # 1 - Create sample dataframe + view # # array of tuples - data dat1 = [ (1, "a"), (1, "b"), (1, "c"), (2, "f"), (2, "g"), (3, "j") ] # array of names - columns col1 = ["A", "B"] # make data frame df1 = spark.createDataFrame(data=dat1, schema=col1) # make temp hive view df1.createOrReplaceTempView("sample_data")
第二步,摆弄临时table。
%sql select * from sample_data
%sql select A, collect_list(B) as B_LIST from sample_data group by A
最后一步,编写代码来执行Spark SQL以创建所需的 Dataframe 。
df2 = spark.sql("select A, collect_list(B) as B_LIST from sample_data group by A") display(df2)
总之,你可以使用dataframe方法来创建相同的输出,但是Spark SQL看起来更干净,更有意义。
2条答案
按热度按时间eblbsuwk1#
您可以按“A”分组,然后使用聚合函数,例如collect_set或collect_array
产出
0lvr5msh2#
第一步,创建示例数据。
第二步,摆弄临时table。
最后一步,编写代码来执行Spark SQL以创建所需的 Dataframe 。
总之,你可以使用dataframe方法来创建相同的输出,但是Spark SQL看起来更干净,更有意义。