对pyspark来说非常新。
我有两个数据集,Events
和Gadget
。它们看起来像这样:
Events
[![在此输入图像说明][1]][1]Gadgets
[![在此输入图像说明][2]][2]
我可以像这样使用读取并连接2个 Dataframe ,并在最后一行中仅显示所需的列:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
spark = SparkSession.builder.appName('PySpark Read CSV').getOrCreate()
# Reading csv file
events = spark.read.option("header",True).csv("events.csv")
events.printSchema()
gadgets = spark.read.option("header",True).csv("gadgets.csv")
gadgets.printSchema()
enrich = events.join(gadgets, events.deviceId == gadgets.ID).select(events["*"],gadgets["User"])
我的任务要求我在dictionary对象中这样表示数据:
丰富任务:
- 使用设备提供的用户数据丰富事件对象。
- 确保丰富的事件如下所示:
{
sessionId: string
deviceId: string
timestamp: timestamp
type: emun(ADDED_TO_CART | APP_OPENED)
total_price: 50.00
user: string
}
我可以处理赋值语句要求的dtype更改和列名重命名,但是我如何以上面的字典格式提供结果呢?
如果我使用这行代码,我甚至不确定如何显示结果:
enrich.rdd.map(lambda row: row.asDict())
1条答案
按热度按时间b91juud31#
使用create_map()函数创建每列及其值的(key,value)对。
create_map
要求输入的格式为(键1,值1,键2,值2,...)。为此,请使用itertools. chain()。输出:
您也可以使用以下命令将其收集为json: