pyspark从df list对象值获取相关行

kgqe7b3p  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(405)

我有一个dataframe,它有一个id列和一个相关的array列,array列包含它的相关记录的id。

ID | NAME | RELATED_IDLIST
--------------------------
123 | mike | [345,456]
345 | alen | [789]
456 | sam  | [789,999]
789 | marc | [111]
555 | dan  | [333]

从上面来看,我需要在所有相关的子id和父id之间建立一个关系

ID | NAME | RELATED_IDLIST
 --------------------------
 123 | mike | [345,456,789,999,111]
 345 | alen | [789,111]
 456 | sam  | [789,999,111]
 789 | marc | [111]
 555 | dan  | [333]

我需要你帮我弄清楚上面的事。

xcitsw88

xcitsw881#

通过使用 Self joins 以及 Window functions 你可以解决这个问题。我把代码分为5个步骤。算法如下:
分解数组列表以创建单个记录(数据中不再有数组)
self-join id和相关(重命名为相关的\u idlist列)列
将具有相同a\u id的记录缩减为一个数组,将b\u id缩减为另一个数组
将两个数组列表列合并到一个组合数组中,并根据每个组合数组的最大大小对结果记录进行排序
选择排名为1的记录
您可以尝试以下代码:


# importing necessary functions for later use

from pyspark.sql.functions import explode,col,collect_set,array_union,size
from pyspark.sql.functions import dense_rank,desc

# need set cross join to True if spark version < 3

spark.conf.set("spark.sql.crossJoin.enabled", True)

############### STEP 0 #####################################

# creating the above mentioned dataframe

id_cols = [123,345,456,789,555]
name_cols = ['mike','alen','sam','marc','dan']
related_idlist_cols = [[345,456],[789],[789,999],[111],[333]]

list_of_rows = [(each_0,each_1,each_2) for each_0, each_1, each_2 in zip(id_cols,name_cols,related_idlist_cols)]
cols_name = ['ID','NAME','RELATED_IDLIST']

# this will result in above mentioned dataframe

df = spark.createDataFrame(list_of_rows,cols_name)

############### STEP 1: Explode values  #####################################

# explode function converts arraylist to atomic records

# one record having array size two will result in two records

# + -> 123, mike,129

# 123, mike , explode(['129'.'9029']) -->

# +-> 123, mike,9029

df_1 = df.select(col('id'),col('name'),explode(df.RELATED_IDLIST).alias('related'))

############### STEP 2 : Self Join with Data  #####################################

# creating dataframes with different column names, for joining them later

a = df_1.withColumnRenamed('id','a_id').withColumnRenamed('name','a_name').withColumnRenamed('related','a_related')
b = df_1.withColumnRenamed('id','b_id').withColumnRenamed('name','b_name').withColumnRenamed('related','b_related')

# this is an example outer join & self join

df_2 = a.join(b, a.a_related == b.b_id, how='left').orderBy(a.a_id)

############### STEP 3 : create Array Lists #####################################

# using collect_set we can reduce values of a particular kind into one set (we are reducing 'related' records, based on 'id')

df_3  = df_2.select('a_id','a_name',collect_set('a_related').over(Window.partitionBy(df_2.a_id)).\
                    alias('a_related_ids'),collect_set('b_related').over(Window.partitionBy(df_2.b_id)).alias('b_related_ids'))

# merging two sets into one column and also calculating resultant the array size

df_4 = df_3.select('a_id','a_name',array_union('a_related_ids','b_related_ids').alias('combined_ids')).withColumn('size',size('combined_ids'))

# ranking the records to pick the ideal records

df_5 = df_4.select('a_id','a_name','combined_ids',dense_rank().over(Window.partitionBy('a_id').orderBy(desc('size'))).alias('rank'))

############### STEP 4 : Selecting Ideal Records  #####################################

# picking records of rank 1, but this will have still ducplicates so removing them using distinct and ordering them by id

df_6 = df_5.select('a_id','a_name','combined_ids').filter(df_5.rank == 1).distinct().orderBy('a_id')

############### STEP 5 #####################################

display(df_6)

相关问题