我有一个PySpark DataFrame,其中有一个map列,如下所示:
root
|-- id: long (nullable = true)
|-- map_col: map (nullable = true)
| |-- key: string
| |-- value: double (valueContainsNull = true)
map_col
有需要根据字典进行转换的关键字。例如,字典可能是:
mapping = {'a': '1', 'b': '2', 'c': '5', 'd': '8' }
因此,DataFrame
需要从:
[Row(id=123, map_col={'a': 0.0, 'b': -42.19}),
Row(id=456, map_col={'a': 13.25, 'c': -19.6, 'd': 15.6})]
更改为:
[Row(id=123, map_col={'1': 0.0, '2': -42.19}),
Row(id=456, map_col={'1': 13.25, '5': -19.6, '8': 15.6})]
我发现如果我可以写出字典,transform_keys
是一个选项,但是它太大了,而且是在工作流的早期动态生成的。我认为explode
/pivot
也可以工作,但是似乎没有性能?
有什么想法吗?
编辑:添加了一点以显示map_col
中map
的大小不一致。
3条答案
按热度按时间yeotifhr1#
一种使用RDD变换的方法。
zpf6vheq2#
transform_keys
可以使用lambda
,如示例所示,它并不局限于expr
。但是,lambda
或Python可调用对象需要使用pyspark.sql.functions
中定义的函数、Column
方法或Scala UDF。因此,使用引用mapping
字典对象的Python UDF目前还不可能通过这种机制实现。通过将mapping
中的键值对展开为链接的when
条件。请参见下面的示例来说明这个想法:上面的输出是:
这说明已成功地将定义的Map(
foo -> a, bar -> b
)应用于列。apply_mapping
函数应该足够通用,以便在您自己的管道中复制和使用。w41d8nur3#
另一个道:
使用itertools创建一个表达式注入到pysparks transform_keys函数中。使用upper只是为了以防万一。代码如下