在PySpark中反转层次结构顺序

i5desfxk  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(147)

我必须创建一个包含员工层次结构的数据框。前3列“姓名”,“职位”,“年龄”应该保持原样,但其他列应该根据层次结构颠倒。
在输入中,Raj向“Pos1”报告,而“Pos1”又报告“Pos2”和“Pos3”,依此类推。我们最多可以有“Pos30”这样的职位。如果员工只有2个层次结构级别,则后续列中会有空值。
对于输出,我们需要颠倒层次结构:Pos3的值应该进入Pos1并且对于其相应的列相同。

fdx2calv

fdx2calv1#

对于您的30组色谱柱,这是一种复杂但有效的方法:

  • Spark3.1+
cols = ['Pos', 'Brid', 'Emid']
groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(30, 0, -1)]
filtered = F.filter(F.array(*groups), lambda x: F.exists(x, lambda y: ~(F.isnull(y) | (y == F.lit("")))))
df = df.select(
    "Name", "Position", "Age",
    *[filtered[x][i].alias(f"{c}{x+1}") for x in range(30) for i, c in enumerate(cols)],
    F.size(filtered).alias("Hierarchy_level")
)
  • Spark2.4+
cols = ['Pos', 'Brid', 'Emid']
groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(30, 0, -1)]
df = df.withColumn('_temp', F.array(*groups))
filtered = F.expr("filter(_temp, x -> exists(x, y -> !(isnull(y) or (y = ''))))")
df = df.select(
    "Name", "Position", "Age",
    *[filtered[x][i].alias(f"{c}{x+1}") for x in range(30) for i, c in enumerate(cols)],
    F.size(filtered).alias("Hierarchy_level")
)

仅使用3组色谱柱进行测试。
输入 Dataframe :

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [("Raj", "Trainee", 30, "Associate", "G104", "100675284", "Consultant", "G105", "10078696", "Sr Consultant", "G106", "1837839"),
     ("Ken", "Associate", 31, "Consultant", "G105", "10078696", "Sr Consultant", "G106", "1837839", "", None, None)],
    ["Name", "Position", "Age", "Pos1", "Brid1", "Emid1", "Pos2", "Brid2", "Emid2", "Pos3", "Brid3", "Emid3"])
df.show()

# +----+---------+---+----------+-----+---------+-------------+-----+--------+-------------+-----+-------+

# |Name| Position|Age|      Pos1|Brid1|    Emid1|         Pos2|Brid2|   Emid2|         Pos3|Brid3|  Emid3|

# +----+---------+---+----------+-----+---------+-------------+-----+--------+-------------+-----+-------+

# | Raj|  Trainee| 30| Associate| G104|100675284|   Consultant| G105|10078696|Sr Consultant| G106|1837839|

# | Ken|Associate| 31|Consultant| G105| 10078696|Sr Consultant| G106| 1837839|             | null|   null|

# +----+---------+---+----------+-----+---------+-------------+-----+--------+-------------+-----+-------+

脚本和结果:

  • Spark3.1+
cols = ['Pos', 'Brid', 'Emid']
groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(3, 0, -1)]
filtered = F.filter(F.array(*groups), lambda x: F.exists(x, lambda y: ~(F.isnull(y) | (y == F.lit("")))))
df = df.select(
    "Name", "Position", "Age",
    *[filtered[x][i].alias(f"{c}{x+1}") for x in range(3) for i, c in enumerate(cols)],
    F.size(filtered).alias("Hierarchy_level")
)
df.show()

# +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+

# |Name| Position|Age|         Pos1|Brid1|  Emid1|      Pos2|Brid2|   Emid2|     Pos3|Brid3|    Emid3|Hierarchy_level|

# +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+

# | Raj|  Trainee| 30|Sr Consultant| G106|1837839|Consultant| G105|10078696|Associate| G104|100675284|              3|

# | Ken|Associate| 31|Sr Consultant| G106|1837839|Consultant| G105|10078696|     null| null|     null|              2|

# +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+
  • Spark2.4+
cols = ['Pos', 'Brid', 'Emid']
groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(3, 0, -1)]
df = df.withColumn('_temp', F.array(*groups))
filtered = F.expr("filter(_temp, x -> exists(x, y -> !(isnull(y) or (y = ''))))")
df = df.select(
    "Name", "Position", "Age",
    *[filtered[x][i].alias(f"{c}{x+1}") for x in range(3) for i, c in enumerate(cols)],
    F.size(filtered).alias("Hierarchy_level")
)

效率证明:

df.explain()

# == Physical Plan ==

# Project [Name#3697, Position#3698, Age#3699L, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3780, lambdafunction((isnotnull(lambda x_30#3790) AND NOT (lambda x_30#3790 = )), lambda x_30#3790, false)), lambda x_29#3780, false))[0][0] AS Pos1#3770, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3781, lambdafunction((isnotnull(lambda x_30#3791) AND NOT (lambda x_30#3791 = )), lambda x_30#3791, false)), lambda x_29#3781, false))[0][1] AS Brid1#3771, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3782, lambdafunction((isnotnull(lambda x_30#3792) AND NOT (lambda x_30#3792 = )), lambda x_30#3792, false)), lambda x_29#3782, false))[0][2] AS Emid1#3772, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3783, lambdafunction((isnotnull(lambda x_30#3793) AND NOT (lambda x_30#3793 = )), lambda x_30#3793, false)), lambda x_29#3783, false))[1][0] AS Pos2#3773, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3784, lambdafunction((isnotnull(lambda x_30#3794) AND NOT (lambda x_30#3794 = )), lambda x_30#3794, false)), lambda x_29#3784, false))[1][1] AS Brid2#3774, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3785, lambdafunction((isnotnull(lambda x_30#3795) AND NOT (lambda x_30#3795 = )), lambda x_30#3795, false)), lambda x_29#3785, false))[1][2] AS Emid2#3775, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3786, lambdafunction((isnotnull(lambda x_30#3796) AND NOT (lambda x_30#3796 = )), lambda x_30#3796, false)), lambda x_29#3786, false))[2][0] AS Pos3#3776, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3787, lambdafunction((isnotnull(lambda x_30#3797) AND NOT (lambda x_30#3797 = )), lambda x_30#3797, false)), lambda x_29#3787, false))[2][1] AS Brid3#3777, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3788, lambdafunction((isnotnull(lambda x_30#3798) AND NOT (lambda x_30#3798 = )), lambda x_30#3798, false)), lambda x_29#3788, false))[2][2] AS Emid3#3778, size(filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3789, lambdafunction((isnotnull(lambda x_30#3799) AND NOT (lambda x_30#3799 = )), lambda x_30#3799, false)), lambda x_29#3789, false)), true) AS Hierarchy_level#3779]

# +- *(1) Scan ExistingRDD[Name#3697,Position#3698,Age#3699L,Pos1#3700,Brid1#3701,Emid1#3702,Pos2#3703,Brid2#3704,Emid2#3705,Pos3#3706,Brid3#3707,Emid3#3708]

相关问题