PySpark:根据列中值和字典创建列

kb5ga3dv  于 2022-12-03  发布在  Spark
关注(0)|答案(2)|浏览(262)

我有一个PySpark数据框架,其中包含值和字典,字典为值提供文本Map。

| value    | dict                                           | 
| -------- | ---------------------------------------------- |
| 1        | {"1": "Text A", "2": "Text B"}                 |
| 2        | {"1": "Text A", "2": "Text B"}                 |
| 0        | {"0": "Another text A", "1": "Another text B"} |

我想创建一个包含正确Map的“status”列。

| value    | dict                             | status   |
| -------- | -------------------------------  | -------- |
| 1        | {"1": "Text A", "2": "Text B"}   | Text A   |
| 2        | {"1": "Text A", "2": "Text B"}   | Text B   |
| 0        | {"0": "Other A", "1": "Other B"} | Other A  |

我试过这个代码:

df.withColumn("status", F.col("dict").getItem(F.col("value"))

这段代码不起作用。对于一个硬编码的值,比如“2”,同样的代码确实提供了输出,但当然不是正确的输出:

df.withColumn("status", F.col("dict").getItem("2"))

有人能帮助我在状态列中获得正确的Map值吗?
编辑:我的代码确实工作了,除了我的“值”是一个double,而dict中的键是字符串。当将列从double转换为int再转换为string时,代码工作了。

xtfmy6hx

xtfmy6hx1#

这是我的2分钱
1.通过从CSV或任何其他源(在我的例子中,它只是静态数据)阅读来创建 Dataframe

from pyspark.sql.types import *

 data = [
 (1 , {"1": "Text A", "2": "Text B"}),
 (2 , {"1": "Text A", "2": "Text B"}),
 (0 , {"0": "Another text A", "1": "Another text B"} )
 ]

 schema = StructType([
                     StructField("ID",StringType(),True),
                     StructField("Dictionary",MapType(StringType(),StringType()),True),
                     ])

 df = spark.createDataFrame(data,schema=schema)
 df.show(truncate=False)

1.然后直接提取基于id的字典值作为关键字。

df.withColumn('extract',df.Dictionary[df.ID]).show(truncate=False)

检查下图以供参考:

des4xlb0

des4xlb02#

希望这对你有帮助。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

if __name__ == '__main__':
    spark = SparkSession.builder.appName('Medium').master('local[1]').getOrCreate()
    df = spark.read.format('csv').option("header","true").option("delimiter","|").load("/Users/dshanmugam/Desktop/ss.csv")
    schema = StructType([
        StructField("1", StringType(), True)
    ])

    def return_value(data):
        key = data.split('-')[0]
        value = json.loads(data.split('-')[1])[key]
        return value

    returnVal = udf(return_value)
    df_new = df.withColumn("newCol",concat_ws("-",col("value"),col("dict"))).withColumn("result",returnVal(col("newCol")))
    df_new.select(["value","result"]).show(10,False)

结果:

+-----+--------------+
|value|result        |
+-----+--------------+
|1    |Text A        |
|2    |Text B        |
|0    |Another text A|
+-----+--------------+

我正在使用UDF。如果性能是一个问题,您可以尝试使用一些其他选项。

相关问题