需要Pyspark解决方案通过检查列之间的依赖关系来生成运行顺序

imzjd6km  于 2023-06-21  发布在  Spark
关注(0)|答案(1)|浏览(120)

我有一个带列的数据框架
| 瓦利德|查询键|
| - -----|- -----|
| ID1| qid1|
| ID2| ID1|
| ID3| qid3|
| ID4| ID2|
对于这个数据,我想创建一个runorder列。questionkey列具有来自id列的id,因此在执行此依赖id 3之前,应首先执行id。
期望输出
| 瓦利德|查询键|运行指令|
| - -----|- -----|- -----|
| ID1| qid1| 1|
| ID2| ID1| 2|
| ID3| qid3| 1|
| ID4| ID2| 3|
我们可以做些什么来实现这一点使用pyspark?
我尝试使用case when condition,但递归操作我无法执行。

第一次将runorder赋值为1给所有变量

df_with_runorder = df.withColumn("runorder", lit(1)).withColumnRenamed("VarID","VarID_a") 
#renamed the columns to before joining
df_with_runorder=df_with_runorder.withColumnRenamed("QuestionKey","QuestionKey_a")\ .withColumnRenamed("runorder","runorder_a") 

#here performed joined to check dependency of values in varid and questionkey and if match found(dependency found) then updated runorder by adding 1 to it

joined_df = df_with_runorder.alias("a").join(df.alias("b"), col("a.varid_a") == col("b.QuestionKey"), "left")\ .withColumn("updated_runorder",when(col("VarID").isNotNull(),col("runorder_a")+1).otherwise(col("runorder_a"))) joined_df.show()

这将只更新一次,它将不检查每个依赖性,它将只检查立即依赖性并更新运行顺序,但是对于ID 4,存在与ID 2的2个依赖性,然后ID 1,它将只检查其依赖于D2并添加1,但是理想地,它应该检查ID 2也依赖于ID 1,首先将ID 2更新为2,然后将ID 4更新为3。

5kgi1eie

5kgi1eie1#

我认为你需要一个迭代算法,这样你就可以创建这个依赖项计数

from pyspark.sql.functions import col, lit

data = [("id1", "qid1"),
        ("id2", "id1"),
        ("id3", "qid3"),
        ("id4", "id2")]

df = spark.createDataFrame(data, ["id", "questionkey"])
df = df.withColumn('runOrder', lit(1)) # Set default order as 1 for all

index = 1
while True:
    qdf = df.filter(df.runOrder == index).select(df.id).alias("qdf")
    if qdf.count() == 0:
        # If no records with new changes from the last iteration then break
        break;
    index += 1
    df = df.join(qdf, df.questionkey == qdf.id, "left_outer")
    df = df.withColumn('runOrder', when(col('qdf.id').isNotNull(), df.runOrder + 1).otherwise(df.runOrder)).drop(col('qdf.id'))
    df.cache()

df.orderBy('id').show()
+---+-----------+--------+
| id|questionkey|runOrder|
+---+-----------+--------+
|id1|       qid1|       1|
|id2|        id1|       2|
|id3|       qid3|       1|
|id4|        id2|       3|
+---+-----------+--------+

相关问题