如何显示行作为字典从pyspark Dataframe ?

vhmi4jdf  于 2022-12-22  发布在  Spark
关注(0)|答案(1)|浏览(155)

对pyspark来说非常新。
我有两个数据集,EventsGadget。它们看起来像这样:

Events

[![在此输入图像说明][1]][1]
Gadgets
[![在此输入图像说明][2]][2]
我可以像这样使用读取并连接2个 Dataframe ,并在最后一行中仅显示所需的列:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

spark = SparkSession.builder.appName('PySpark Read CSV').getOrCreate()

# Reading csv file
events = spark.read.option("header",True).csv("events.csv")
events.printSchema()

gadgets = spark.read.option("header",True).csv("gadgets.csv")
gadgets.printSchema()

enrich = events.join(gadgets, events.deviceId == gadgets.ID).select(events["*"],gadgets["User"])

我的任务要求我在dictionary对象中这样表示数据:
丰富任务:

  • 使用设备提供的用户数据丰富事件对象。
  • 确保丰富的事件如下所示:
{
    sessionId: string
    deviceId: string
    timestamp: timestamp
    type: emun(ADDED_TO_CART | APP_OPENED)
    total_price: 50.00
    user: string
}

我可以处理赋值语句要求的dtype更改和列名重命名,但是我如何以上面的字典格式提供结果呢?
如果我使用这行代码,我甚至不确定如何显示结果:

enrich.rdd.map(lambda row: row.asDict())
b91juud3

b91juud31#

使用create_map()函数创建每列及其值的(key,value)对。
create_map要求输入的格式为(键1,值1,键2,值2,...)。为此,请使用itertools. chain()。

df = spark.createDataFrame(data=[["sess1","dev1","2022-12-19","emun(ADDED_TO_CART | APP_OPENED)","50.00","usr1"],["sess2","dev2","2022-12-18","emun(ADDED_TO_CART | APP_OPENED)","100.00","usr2"]], schema=["sessionId","deviceId","timestamp","type","total_price","user"])

import pyspark.sql.functions as F
import itertools

df = df.withColumn("map", \
                   F.create_map( \
                       list(itertools.chain( \
                           *((F.lit(x), F.col(x)) for x in df.columns) \
                       )) \
                   ))

df.show(truncate=False)

输出:

+---------+--------+----------+--------------------------------+-----------+----+----------------------------------------------------------------------------------------------------------------------------------------------+
|sessionId|deviceId|timestamp |type                            |total_price|user|map                                                                                                                                           |
+---------+--------+----------+--------------------------------+-----------+----+----------------------------------------------------------------------------------------------------------------------------------------------+
|sess1    |dev1    |2022-12-19|emun(ADDED_TO_CART | APP_OPENED)|50.00      |usr1|{sessionId -> sess1, deviceId -> dev1, timestamp -> 2022-12-19, type -> emun(ADDED_TO_CART | APP_OPENED), total_price -> 50.00, user -> usr1} |
|sess2    |dev2    |2022-12-18|emun(ADDED_TO_CART | APP_OPENED)|100.00     |usr2|{sessionId -> sess2, deviceId -> dev2, timestamp -> 2022-12-18, type -> emun(ADDED_TO_CART | APP_OPENED), total_price -> 100.00, user -> usr2}|
+---------+--------+----------+--------------------------------+-----------+----+----------------------------------------------------------------------------------------------------------------------------------------------+

您也可以使用以下命令将其收集为json

df = df.withColumn("json", F.to_json("map"))

相关问题