目前,我已经定义了文件结构,所以dataframe模式如下(使用不同来源的样本数据)
schema = StructType([
StructField('id', StringType(), True),
StructField('dept', StringType(), True),
StructField('salary', IntegerType(), True),
StructField('location', StringType(), True)
])
df = spark.createDataFrame([(36636,'Finance' ,3000, 'USA'),
(40288,'Finance' , 5000, 'IND'),
(42114,'Sales', 3900,'USA'),
(39192,'Marketing',2500, 'CAN'),
(34534,'Sales', 6500, 'USA')],
schema=schema)
我做下面的操作,以创建Map类型为2列如下
df = df.withColumn("propertiesMap",create_map(
lit("salary"),col("salary"),
lit("location"),col("location")
)).drop("salary","location")
我的数据框看起来像这样
+-----+---------+---------------------------------+
|id |dept |propertiesMap |
+-----+---------+---------------------------------+
|36636|Finance |[salary -> 3000, location -> USA]|
|40288|Finance |[salary -> 5000, location -> IND]|
|42114|Sales |[salary -> 3900, location -> USA]|
|39192|Marketing|[salary -> 2500, location -> CAN]|
|34534|Sales |[salary -> 6500, location -> USA]|
+-----+---------+---------------------------------+
接下来,输入文件可能具有动态列,如
file1.csv
id,dept,Salary,location
file2.csv
id,dept,salary
file3.csv
id,dept
file4.csv
id,dept,firstname,lastname,middlename,address
在所有情况下,id,dept
都不会改变,但所有其他列都是动态的。
例如,取上面的file1.csv
fixed_columns = [id,dept]
all_columns= df.columns
dynamic_col = list(set(all_columns) - set(fixed_columns))
给予
dynamic_col = [salary, location]
And I want something like and to use append?. Not sure
for i in dynamic_col;
df = df.withColumn('propertiesMap', create_map( lit(i), col(i)))
一旦所有的文件得到处理并附加到最终的 Dataframe ,它必须看起来像
+-----+---------+------------------------------------------------------+
|id |dept |propertiesMap |
+-----+---------+------------------------------------------------------+
|36636|Finance |[salary -> 3000, location -> USA] |
|40288|Finance |[salary -> 5000, location -> IND] |
|42114|Sales |[salary -> 3900, location -> USA] |
|39192|Marketing|[salary -> 2500, location -> CAN] |
|34534|Sales |[salary -> 6500, location -> USA] |
|36636|Finance |[firstname -> kevin, lastname -> Miller] |
|40288|Finance |[firstname -> aaron, lastname -> sahn] |
|42114|Sales |[firstname -> daron, lastname -> ket] |
|39192|Marketing|[] |
|34534|Sales |[firstname -> dev, lastname -> dis, middlename -> Sam]|
+-----+---------+------------------------------------------------------+
我不用Pandas。
2条答案
按热度按时间gopyfrb31#
像MapType()这样的原始数据类型的目的是拥有一个分层的数据结构。而不是追加和加倍你的df长度,我会确保每个id和dept一行。如果是这样,那么让我们考虑:
原始df
csv1 df
csv2 df
根据动态添加的 Dataframe 的大小,您有两个选项,**1.**要么在追加到现有df之前先读取并存储在列表中。**2.在读取并追加到现有df时循环。无论哪种选项,您都需要一个for循环。在这种情况下,我使用了一个df的列表。在for循环中3.**union existing with new df.**4.**groupby和agg将目标maptype列放入数组中。**5.**利用高阶函数将上面获取的数组转换回map中。见下面的代码;
hof1towb2#
我想你应该尝试
map_concat
在pyspark为您的要求如果你想找多个数据类型的Map,请 checkout 这篇文章https://stackoverflow.com/a/67088231/7224372