我有一个带列的数据框架
| 瓦利德|查询键|
| - -----|- -----|
| ID1| qid1|
| ID2| ID1|
| ID3| qid3|
| ID4| ID2|
对于这个数据,我想创建一个runorder
列。questionkey
列具有来自id
列的id,因此在执行此依赖id 3之前,应首先执行id。
期望输出
| 瓦利德|查询键|运行指令|
| - -----|- -----|- -----|
| ID1| qid1| 1|
| ID2| ID1| 2|
| ID3| qid3| 1|
| ID4| ID2| 3|
我们可以做些什么来实现这一点使用pyspark?
我尝试使用case when condition,但递归操作我无法执行。
第一次将runorder赋值为1给所有变量
df_with_runorder = df.withColumn("runorder", lit(1)).withColumnRenamed("VarID","VarID_a")
#renamed the columns to before joining
df_with_runorder=df_with_runorder.withColumnRenamed("QuestionKey","QuestionKey_a")\ .withColumnRenamed("runorder","runorder_a")
#here performed joined to check dependency of values in varid and questionkey and if match found(dependency found) then updated runorder by adding 1 to it
joined_df = df_with_runorder.alias("a").join(df.alias("b"), col("a.varid_a") == col("b.QuestionKey"), "left")\ .withColumn("updated_runorder",when(col("VarID").isNotNull(),col("runorder_a")+1).otherwise(col("runorder_a"))) joined_df.show()
这将只更新一次,它将不检查每个依赖性,它将只检查立即依赖性并更新运行顺序,但是对于ID 4,存在与ID 2的2个依赖性,然后ID 1,它将只检查其依赖于D2并添加1,但是理想地,它应该检查ID 2也依赖于ID 1,首先将ID 2更新为2,然后将ID 4更新为3。
1条答案
按热度按时间5kgi1eie1#
我认为你需要一个迭代算法,这样你就可以创建这个依赖项计数