cols = ['Pos', 'Brid', 'Emid']
groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(30, 0, -1)]
filtered = F.filter(F.array(*groups), lambda x: F.exists(x, lambda y: ~(F.isnull(y) | (y == F.lit("")))))
df = df.select(
"Name", "Position", "Age",
*[filtered[x][i].alias(f"{c}{x+1}") for x in range(30) for i, c in enumerate(cols)],
F.size(filtered).alias("Hierarchy_level")
)
Spark2.4+
cols = ['Pos', 'Brid', 'Emid']
groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(30, 0, -1)]
df = df.withColumn('_temp', F.array(*groups))
filtered = F.expr("filter(_temp, x -> exists(x, y -> !(isnull(y) or (y = ''))))")
df = df.select(
"Name", "Position", "Age",
*[filtered[x][i].alias(f"{c}{x+1}") for x in range(30) for i, c in enumerate(cols)],
F.size(filtered).alias("Hierarchy_level")
)
cols = ['Pos', 'Brid', 'Emid']
groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(3, 0, -1)]
filtered = F.filter(F.array(*groups), lambda x: F.exists(x, lambda y: ~(F.isnull(y) | (y == F.lit("")))))
df = df.select(
"Name", "Position", "Age",
*[filtered[x][i].alias(f"{c}{x+1}") for x in range(3) for i, c in enumerate(cols)],
F.size(filtered).alias("Hierarchy_level")
)
df.show()
# +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+
# |Name| Position|Age| Pos1|Brid1| Emid1| Pos2|Brid2| Emid2| Pos3|Brid3| Emid3|Hierarchy_level|
# +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+
# | Raj| Trainee| 30|Sr Consultant| G106|1837839|Consultant| G105|10078696|Associate| G104|100675284| 3|
# | Ken|Associate| 31|Sr Consultant| G106|1837839|Consultant| G105|10078696| null| null| null| 2|
# +----+---------+---+-------------+-----+-------+----------+-----+--------+---------+-----+---------+---------------+
Spark2.4+
cols = ['Pos', 'Brid', 'Emid']
groups = [F.array(df.colRegex(fr"`({'|'.join(cols)}){i}$`")) for i in range(3, 0, -1)]
df = df.withColumn('_temp', F.array(*groups))
filtered = F.expr("filter(_temp, x -> exists(x, y -> !(isnull(y) or (y = ''))))")
df = df.select(
"Name", "Position", "Age",
*[filtered[x][i].alias(f"{c}{x+1}") for x in range(3) for i, c in enumerate(cols)],
F.size(filtered).alias("Hierarchy_level")
)
效率证明:
df.explain()
# == Physical Plan ==
# Project [Name#3697, Position#3698, Age#3699L, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3780, lambdafunction((isnotnull(lambda x_30#3790) AND NOT (lambda x_30#3790 = )), lambda x_30#3790, false)), lambda x_29#3780, false))[0][0] AS Pos1#3770, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3781, lambdafunction((isnotnull(lambda x_30#3791) AND NOT (lambda x_30#3791 = )), lambda x_30#3791, false)), lambda x_29#3781, false))[0][1] AS Brid1#3771, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3782, lambdafunction((isnotnull(lambda x_30#3792) AND NOT (lambda x_30#3792 = )), lambda x_30#3792, false)), lambda x_29#3782, false))[0][2] AS Emid1#3772, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3783, lambdafunction((isnotnull(lambda x_30#3793) AND NOT (lambda x_30#3793 = )), lambda x_30#3793, false)), lambda x_29#3783, false))[1][0] AS Pos2#3773, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3784, lambdafunction((isnotnull(lambda x_30#3794) AND NOT (lambda x_30#3794 = )), lambda x_30#3794, false)), lambda x_29#3784, false))[1][1] AS Brid2#3774, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3785, lambdafunction((isnotnull(lambda x_30#3795) AND NOT (lambda x_30#3795 = )), lambda x_30#3795, false)), lambda x_29#3785, false))[1][2] AS Emid2#3775, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3786, lambdafunction((isnotnull(lambda x_30#3796) AND NOT (lambda x_30#3796 = )), lambda x_30#3796, false)), lambda x_29#3786, false))[2][0] AS Pos3#3776, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3787, lambdafunction((isnotnull(lambda x_30#3797) AND NOT (lambda x_30#3797 = )), lambda x_30#3797, false)), lambda x_29#3787, false))[2][1] AS Brid3#3777, filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3788, lambdafunction((isnotnull(lambda x_30#3798) AND NOT (lambda x_30#3798 = )), lambda x_30#3798, false)), lambda x_29#3788, false))[2][2] AS Emid3#3778, size(filter(array(array(Pos3#3706, Brid3#3707, Emid3#3708), array(Pos2#3703, Brid2#3704, Emid2#3705), array(Pos1#3700, Brid1#3701, Emid1#3702)), lambdafunction(exists(lambda x_29#3789, lambdafunction((isnotnull(lambda x_30#3799) AND NOT (lambda x_30#3799 = )), lambda x_30#3799, false)), lambda x_29#3789, false)), true) AS Hierarchy_level#3779]
# +- *(1) Scan ExistingRDD[Name#3697,Position#3698,Age#3699L,Pos1#3700,Brid1#3701,Emid1#3702,Pos2#3703,Brid2#3704,Emid2#3705,Pos3#3706,Brid3#3707,Emid3#3708]
1条答案
按热度按时间fdx2calv1#
对于您的30组色谱柱,这是一种复杂但有效的方法:
仅使用3组色谱柱进行测试。
输入 Dataframe :
脚本和结果:
效率证明: