我有一个Map表(帐户ID-日期-群集ID)
示例:
| 帐户标识|日期|群集ID|
| - ------| - ------| - ------|
| 1个|二〇一八年|A类|
| 第二章|二〇一八年|A类|
| 第二章|二○一九年|乙|
| 三个|二○一九年|乙|
| 1个|小行星2020| C级|
| 四个|小行星2020| C级|
| 1个|小行星2021| D级|
| 五个|小行星2021| A类|
规则:
- 相同的cluster_id意味着它是同一个用户,但仅在特定日期有效。因此,一个帐户可以在不同日期具有多个cluster_id
- 特定日期的cluster_id可以有多个account_id
- 一个cluster_id可以出现在多个日期,但不一定表示它们是同一个用户
使用此表,我希望创建account_id-user_idMap。
预期输出:
| 帐户标识|用户标识|
| - ------| - ------|
| 1个|1个|
| 第二章|1个|
| 三个|1个|
| 四个|1个|
| 五个|第二章|
- 实际的user_id并不重要,只要将正确的帐户链接在一起即可
我试着这样开始,但不认为这是正确的。开放的任何想法。
df_user_mapping = cluster_df.withColumn('intermediate_group_id', F.dense_rank().over(Window.orderBy('year','month','cluster_id')).selectExpr('account_id','month','year','cluster_id as group_id','intermediate_group_id')
inter_groups = df_user_mapping.groupBy('account_id').agg(F.collect_set('intermediate_group_id').alias('intermediate_groups')))
groups = df_user_mapping.groupBy('account_id','year','month').agg(F.collect_set('group_id').alias('groups')))
joined = groups.join(inter_groups, ['account_id'], 'outer')
inter_explode = joined.select('account_id','groups','year','month',F.explode('intermediate_groups').alias('intermediate_group_id'))
group_explode = inter_explode.select('account_id','intermediate_group_id','year','month',F.explode('groups').alias('group_id'))
first_ids = group_explode.withColumn('first_id_inter', F.min('account_id').over(Window.partitionBy('inter_group_id'))).withColumn('first_id_group', F.min('first_id_inter').over(Window.partitionBy('group_id','year','month'))).withColumn('first_id', F.min('first_id_group').over(Window.partitionBy('account_id')))
final = first_ids.selectExpr('account_id','first_id as user_id').distinct()
如果你有什么想法请告诉我!
初学者模板
example = [('1',1,2018,'A'),
('2',1,2018,'A'),
('2',1,2019,'B'),
('3',1,2019,'B'),
('1',1,2020,'C'),
('4',1,2020,'C'),
('1',1,2021,'D'),
('5',1,2021,'A'),
]
example_df = spark.createDataFrame(example, ['account_id','month','year','cluster_id'])
1条答案
按热度按时间kyks70gy1#
试试这个