我有这样一个Pyspark Dataframe :
data = [ {"master_record_id": "001-0073-683496",
'dob': datetime.date(2000, 1, 1),
"patient_ssn": "123456789",
"dodid": "1234567891",
"dqi_id":"123",
"site_id":700},
{"master_record_id": "001-0013-101321",
'dob': datetime.date(2000, 1, 1),
"patient_ssn": "123456789",
"dodid": "1234567891",
"dqi_id":"123",
"site_id":701},
{"master_record_id": "001-0046-2845712",
'dob': datetime.date(1999, 2, 3),
"patient_ssn": "987654321",
"dodid": "0987654322",
"dqi_id":None,
"site_id":775},
{"master_record_id": "001-0048-2845712",
'dob': datetime.date(1999, 2, 3),
"patient_ssn": "987654321",
"dodid": "0987654322",
"dqi_id":None,
"site_id":775}]
df = spark.createDataFrame(data=data)
我希望能够使用窗口函数为共享相同dodid、patient_ssn和dob的记录分配uuid。
目前我有一个可行的解决方案,但它不能扩展到数百万条记录。我相信是什么让它变慢了,在集群中循环,并创建了几个 Dataframe 。有没有办法直接在Window函数中分配一个uuid?我的可行但低效的解决方案如下:
# Filter out any recrods with null dob/patient_ssn/dodid
df = df.filter("dob is NOT NULL AND patient_ssn is NOT NULL AND dodid is NOT NULL")
# Create a cluster id based on dob/patient_ssn/dodid
window = Window.orderBy(["dob","patient_ssn","dodid"])
df = df.withColumn("cluster_id",lit(f.dense_rank().over(window)-1))
cluster_list = set(df.select("cluster_id").rdd.flatMap(lambda x: x).collect())
df_list = []
# Iterate throuh clusters assigning uuid to each cluster. Each cluster will now be a new dataframe
for cluster in cluster_list:
temp = df.filter(col("cluster_id")==cluster)
df_list.append(temp.withColumn("uuid",f.lit(str(uuid.uuid1()))))
# Union all dataframes from df_list
df = reduce(DataFrame.unionAll,df_list)
1条答案
按热度按时间wfveoks01#
我看不出使用Window函数的原因。连接所有三个“key”列,然后运行md5怎么样?
.withColumn('uuid',md5(连续字符串('多迪','病人_ssn ','多布')))