PySpark:使用字典转换Map列键

qxsslcnc  于 2022-11-01  发布在  Spark
关注(0)|答案(3)|浏览(230)

我有一个PySpark DataFrame,其中有一个map列,如下所示:

root
 |-- id: long (nullable = true)
 |-- map_col: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)

map_col有需要根据字典进行转换的关键字。例如,字典可能是:

mapping = {'a': '1', 'b': '2', 'c': '5', 'd': '8' }

因此,DataFrame需要从:

[Row(id=123, map_col={'a': 0.0, 'b': -42.19}),
  Row(id=456, map_col={'a': 13.25, 'c': -19.6, 'd': 15.6})]

更改为:

[Row(id=123, map_col={'1': 0.0, '2': -42.19}),
  Row(id=456, map_col={'1': 13.25, '5': -19.6, '8': 15.6})]

我发现如果我可以写出字典,transform_keys是一个选项,但是它太大了,而且是在工作流的早期动态生成的。我认为explode/pivot也可以工作,但是似乎没有性能?
有什么想法吗?

编辑:添加了一点以显示map_colmap的大小不一致。

yeotifhr

yeotifhr1#

一种使用RDD变换的方法。

def updateKey(theDict, mapDict):
    """
    update theDict's key using mapDict
    """

    updDict = []
    for item in theDict.items():
        updDict.append((mapDict[item[0]] if item[0] in mapDict.keys() else item[0], item[1]))

    return dict(updDict)

data_sdf.rdd. \
    map(lambda r: (r[0], r[1], updateKey(r[1], mapping))). \
    toDF(['id', 'map_col', 'new_map_col']). \
    show(truncate=False)

# +---+-----------------------------------+-----------------------------------+

# |id |map_col                            |new_map_col                        |

# +---+-----------------------------------+-----------------------------------+

# |123|{a -> 0.0, b -> -42.19, e -> 12.12}|{1 -> 0.0, 2 -> -42.19, e -> 12.12}|

# |456|{a -> 13.25, c -> -19.6, d -> 15.6}|{8 -> 15.6, 1 -> 13.25, 5 -> -19.6}|

# +---+-----------------------------------+-----------------------------------+
  • 另外,我在map_col的第一行中添加了一个新键,以显示如果没有可用的Map会发生什么 *
zpf6vheq

zpf6vheq2#

transform_keys可以使用lambda,如示例所示,它并不局限于expr。但是,lambda或Python可调用对象需要使用pyspark.sql.functions中定义的函数、Column方法或Scala UDF。因此,使用引用mapping字典对象的Python UDF目前还不可能通过这种机制实现。通过将mapping中的键值对展开为链接的when条件。请参见下面的示例来说明这个想法:

from typing import Dict, Callable
from functools import reduce

from pyspark.sql.functions import Column, when, transform_keys
from pyspark.sql import SparkSession

def apply_mapping(mapping: Dict[str, str]) -> Callable[[Column, Column], Column]:

    def convert_mapping_into_when_conditions(key: Column, _: Column) -> Column:
        initial_key, initial_value = mapping.popitem()
        initial_condition = when(key == initial_key, initial_value)
        return reduce(lambda x, y: x.when(key == y[0], y[1]), mapping.items(), initial_condition)

    return convert_mapping_into_when_conditions

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("Temp")\
        .getOrCreate()
    df = spark.createDataFrame([(1, {"foo": -2.0, "bar": 2.0})], ("id", "data"))
    mapping = {'foo': 'a', 'bar': 'b'}
    df.select(transform_keys(
        "data", apply_mapping(mapping)).alias("data_transformed")
              ).show(truncate=False)

上面的输出是:

+---------------------+
|data_transformed     |
+---------------------+
|{b -> 2.0, a -> -2.0}|
+---------------------+

这说明已成功地将定义的Map(foo -> a, bar -> b)应用于列。apply_mapping函数应该足够通用,以便在您自己的管道中复制和使用。

w41d8nur

w41d8nur3#

另一个道:
使用itertools创建一个表达式注入到pysparks transform_keys函数中。使用upper只是为了以防万一。代码如下

from itertools import chain

m_expr1 = create_map([lit(x) for x in chain(*mapping.items())])

new =df.withColumn('new_map_col',transform_keys("map_col", lambda k, _: upper(m_expr1[k])))

new.show(truncate=False)

+---+-----------------------------------+-----------------------------------+
|id |map_col                            |new_map_col                        |
+---+-----------------------------------+-----------------------------------+
|123|{a -> 0.0, b -> -42.19}            |{1 -> 0.0, 2 -> -42.19}            |
|456|{a -> 13.25, c -> -19.6, d -> 15.6}|{1 -> 13.25, 5 -> -19.6, 8 -> 15.6}|
+---+-----------------------------------+-----------------------------------+

相关问题