我有一个从csv文件读取的sparkDataframe,如下所示:
df = ss.read \
.format("csv") \
.option("delimiter", ";") \
.option("header", "false") \
.option("inferSchema", "true") \
.option("escape", "\"") \
.option("multiline", "true") \
.option("wholeFile", "true") \
.load(file_path)
Dataframe如下所示:
|cod_cli|article_name|rank|
|123 |art_1 |1 |
|123 |art_2 |2 |
|123 |art_3 |3 |
|456 |art_4 |1 |
|456 |art_5 |2 |
|456 |art_6 |3 |
我想按column cod\u cli对元素进行分组,并创建多个列,每个列对应于分组集中的每个产品,作为一个值,一个dictionary key value,其中key作为列名,value作为与该列名相关的值,如下所示:
|cod_cli|Product 1 |Product 2 |Product 3 |
|123 |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456 |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
字典值可以是字符串(更好)或Map。我试着这样做:
df = df \
.groupBy(F.col("cod_cli")) \
.agg(F.collect_list(F.array("cod_art","rank")))
但是通过这种方式,我创建了一个包含所有分组元素的数组列的列。
有人能帮我吗?
谢谢您
更新
建议的解决方案是:
df = df.withColumn(
"Product",
F.to_json(
F.struct(F.col("cod_art"), F.col("rank"))
)
)
通过这种方式,我创建了一个列“product”,其中包含所需的json字符串,例如 {cod_art : art_1, rank : 1}
.
然后:
df = df \
.groupBy(F.col("cod_cli")) \
.pivot("rank") \
.agg(F.first("Product"))
通过这种方式,我可以为每个产品创建一个列,按cod\u cli属性分组,并处理将3个以上产品作为列的情况:
|cod_cli|1 |2 |3
|123 |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456 |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
2条答案
按热度按时间wa7juj8i1#
也许这是有用的-
加载提供的数据
使用pivot和first创建指定的列(应该在pyspark中实现,只需做最小的更改所有的都是pyspark.sql.functions)
1szpjjfi2#
你可以不用
pivot
(昂贵的操作),使用collect_list
的struct
,那么to_json
与create_map
.