计算非升序字符串的最大值

jxct1oxe  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(350)

我有一个每季度存储水平的Dataframe, df1 :

| id  | year | quarter | level  |
|-----|------|---------|--------|
| 111 | 2021 | 1       | Silver |
| 111 | 2021 | 2       | Gold   |
| 222 | 2021 | 1       | Bronze |
| 222 | 2021 | 2       | Silver |

我还有另一个Dataframe,存储相同的数据,但不按季度分组, df2 :

| id  | level  |
|-----|--------|
| 111 | Bronze |
| 222 | Gold   |

我想计算两个Dataframe的最大级别,但由于(g)old<(s)ilver,无法使用max。有没有一种方法来做一个定制的最大捕获金>银>铜的规则?
我的预期输出是这样的。

| id  | year | quarter | level  |
|-----|------|---------|--------|
| 111 | 2021 | 1       | Silver |
| 111 | 2021 | 2       | Gold   |
| 222 | 2021 | 1       | Gold   |
| 222 | 2021 | 2       | Gold   |

在遇到这个问题之前我试过:

output = (
    df1.join(df2, on = ['id'])
    .groupby('id', 'year', 'quarter')
    .agg(
        F.max(F.col('level')).alias('level') #would rank Silver greater than Gold
    )
)
mkshixfv

mkshixfv1#

您还可以使用 when 表达与使用 greatest 函数以获取最大值:

import pyspark.sql.functions as F

order = (F.when(F.col("level") == "Gold", 3)
         .when(F.col("level") == "Silver", 2)
         .when(F.col("level") == "Bronze", 1))

df1 = df1.withColumn("level", F.struct(order, F.col("level")))
df2 = df2.withColumn("level", F.struct(order, F.col("level")))

result = df1.alias("df1").join(df2.alias("df2"), ["id"]).select(
    "id", "year", "quarter",
    F.greatest(F.col("df1.level"), F.col("df2.level")).getField("level").alias("level")
)

result.show()

# +---+----+-------+------+

# | id|year|quarter| level|

# +---+----+-------+------+

# |222|2021|      1|  Gold|

# |222|2021|      2|  Gold|

# |111|2021|      1|Silver|

# |111|2021|      2|  Gold|

# +---+----+-------+------+

或者通过使用定义顺序的Map文字,以及在结构上使用max的相同方法:

order = F.create_map(*[F.lit(l) for l in ['Gold', 3, 'Silver', 2, 'Bronze', 1]])

df1 = df1.withColumn("level", F.struct(order.getItem(F.col("level")), F.col("level")))
df2 = df2.withColumn("level", F.struct(order.getItem(F.col("level")), F.col("level")))

result = df1.alias("df1").join(df2.alias("df2"), ["id"]).select(
    "id", "year", "quarter",
    F.greatest(F.col("df1.level"), F.col("df2.level")).getField("level").alias("level")
)
fruv7luv

fruv7luv2#

您可以创建一个Map数组列,以便于按数组索引排序,并使用 greatest 得到你想要计算的最高水平。

import pyspark.sql.functions as F

df = df1.alias('df1').join(df2.alias('df2'), 'id').select(
    'id', 'year', 'quarter', 
    F.expr("""
        array('Bronze', 'Silver', 'Gold')[
            greatest(
                map('Bronze', 0, 'Silver', 1, 'Gold', 2)[df1.level],
                map('Bronze', 0, 'Silver', 1, 'Gold', 2)[df2.level]
            )
        ] as level
    """)
)

df.show()
+---+----+-------+------+
| id|year|quarter| level|
+---+----+-------+------+
|111|2021|      1|Silver|
|111|2021|      2|  Gold|
|222|2021|      1|  Gold|
|222|2021|      2|  Gold|
+---+----+-------+------+

对于较新的spark版本,可以使用 array_position :

df = df1.alias('df1').join(df2.alias('df2'), 'id').withColumn(
    'mapping', 
    F.expr("array('Bronze', 'Silver', 'Gold')")
).select(
    'id', 'year', 'quarter', 
    F.col('mapping')[
        F.expr("greatest(array_position(mapping, df1.level), array_position(mapping, df2.level)) - 1")
    ].alias('level')
)

相关问题