如何使用pyspark对这些数据进行分组和排序?

hk8txs48  于 2023-04-05  发布在  Spark
关注(0)|答案(1)|浏览(226)

我有吉他制造商和吉他的csv数据-包括吉他类型-他们生产

vendor,product
fender,acoustic::paramount
fender,acoustic::newporter
fender,electric::telecaster
yamaha,acoustic::f335
yamaha,electric::pac012

我如何生成每个供应商的每种类型的吉他的总输出,desc按吉他类型之一(原声)排序,例如。

fender - acoustic:2, electric:1
yamaha - acoustic:1, electric:1

任何想法都值得赞赏!

rekjcdws

rekjcdws1#

一月一日

尝试使用内置的groupBy,aggregate,collect_list,map_from_arrays函数获取所需的输出。

一米一米一

df = spark.createDataFrame([('fender','acoustic::paramount'),('fender','acoustic::newporter'),('fender','electric::telecaster'),('yamaha','acoustic::f335'),('yamaha','electric::pac012')],['vendor','product'])

#replace the characters after :: using regexp_repalce function
#group by vendor,product and get the count
df1 = df.withColumn("product",regexp_replace(col("product"),"(::.*)","")).groupBy("vendor","product").agg(count("*").alias("cnt"))

#groupby vendor and get the product, count as arrays
#now use map_from_arrays functions to map two arrays and create dictionary
#use to_json function to create a string from dictionary
#use regexp_replace to replace {,} from the string

df1.groupBy("vendor").agg(regexp_replace(to_json(map_from_arrays(collect_list(col("product")),collect_list(col("cnt")))),"[\{|\}]","").alias("product")).show(10,False)
#+------+-------------------------+
#|vendor|product                  |
#+------+-------------------------+
#|yamaha|"acoustic":1,"electric":1|
#|fender|"acoustic":2,"electric":1|
#+------+-------------------------+

一米二米一x

使用高阶转换函数**array_sort**对嵌套数组进行排序,然后创建一个map,如果你一直在寻找对结构体进行排序的话。

一个月五个月一个月

df2 = df1.groupBy("vendor").agg(collect_list(struct(col("product"),col("cnt"))).alias("ms"))

#use higher order transform function then use array_sort to sort by count of values
#create a map by extracting product name and count then replace the array<dictionary> characters

df2.withColumn("sort_product",expr("array_sort(transform(ms,x->struct(x['product'] as product,x['cnt'] as count)))")).\
withColumn("sort_product",expr("regexp_replace(to_json(transform(sort_product,x -> map(x['product'],x['count']))),'[\\\\[|\\\\]|\{|\}]','')")).\
drop("ms").\
show(10,False)

#+------+-------------------------+
#|vendor|sort_product             |
#+------+-------------------------+
#|yamaha|"acoustic":1,"electric":1|
#|fender|"acoustic":2,"electric":1|
#+------+-------------------------+

相关问题