如何在pyspark Dataframe 中动态添加列/值到Map Type

wvt8vs2t  于 2023-04-11  发布在  Spark
关注(0)|答案(2)|浏览(180)

目前,我已经定义了文件结构,所以dataframe模式如下(使用不同来源的样本数据)

schema = StructType([
     StructField('id', StringType(), True),
     StructField('dept', StringType(), True),
     StructField('salary', IntegerType(), True),
     StructField('location', StringType(), True)
     ])

df = spark.createDataFrame([(36636,'Finance'  ,3000,  'USA'),
(40288,'Finance' ,  5000, 'IND'),
(42114,'Sales',    3900,'USA'),
(39192,'Marketing',2500, 'CAN'),
(34534,'Sales',    6500,  'USA')],
schema=schema)

我做下面的操作,以创建Map类型为2列如下

df = df.withColumn("propertiesMap",create_map(
        lit("salary"),col("salary"),
        lit("location"),col("location")
        )).drop("salary","location")

我的数据框看起来像这样

+-----+---------+---------------------------------+
|id   |dept     |propertiesMap                    |
+-----+---------+---------------------------------+
|36636|Finance  |[salary -> 3000, location -> USA]|
|40288|Finance  |[salary -> 5000, location -> IND]|
|42114|Sales    |[salary -> 3900, location -> USA]|
|39192|Marketing|[salary -> 2500, location -> CAN]|
|34534|Sales    |[salary -> 6500, location -> USA]|
+-----+---------+---------------------------------+

接下来,输入文件可能具有动态列,如

file1.csv
id,dept,Salary,location

file2.csv
id,dept,salary

file3.csv
id,dept

file4.csv
id,dept,firstname,lastname,middlename,address

在所有情况下,id,dept都不会改变,但所有其他列都是动态的。
例如,取上面的file1.csv

fixed_columns = [id,dept]

all_columns= df.columns

dynamic_col = list(set(all_columns) - set(fixed_columns))

给予

dynamic_col = [salary, location]

And I want something like and to use append?. Not sure

for i in dynamic_col;
     df = df.withColumn('propertiesMap', create_map( lit(i), col(i)))

一旦所有的文件得到处理并附加到最终的 Dataframe ,它必须看起来像

+-----+---------+------------------------------------------------------+
|id   |dept     |propertiesMap                                         |
+-----+---------+------------------------------------------------------+
|36636|Finance  |[salary -> 3000, location -> USA]                     |
|40288|Finance  |[salary -> 5000, location -> IND]                     |
|42114|Sales    |[salary -> 3900, location -> USA]                     |
|39192|Marketing|[salary -> 2500, location -> CAN]                     |
|34534|Sales    |[salary -> 6500, location -> USA]                     |
|36636|Finance  |[firstname -> kevin, lastname -> Miller]              |
|40288|Finance  |[firstname -> aaron, lastname -> sahn]                |
|42114|Sales    |[firstname -> daron, lastname -> ket]                 |
|39192|Marketing|[]                                                    |
|34534|Sales    |[firstname -> dev, lastname -> dis, middlename -> Sam]|
+-----+---------+------------------------------------------------------+

我不用Pandas。

gopyfrb3

gopyfrb31#

像MapType()这样的原始数据类型的目的是拥有一个分层的数据结构。而不是追加和加倍你的df长度,我会确保每个id和dept一行。如果是这样,那么让我们考虑:
原始df

+-----+---------+---------------------------------+
|id   |dept     |propertiesMap                    |
+-----+---------+---------------------------------+
|36636|Finance  |{salary -> 3000, location -> USA}|
|40288|Finance  |{salary -> 5000, location -> IND}|
|42114|Sales    |{salary -> 3900, location -> USA}|
|39192|Marketing|{salary -> 2500, location -> CAN}|
|34534|Sales    |{salary -> 6500, location -> USA}|
+-----+---------+---------------------------------+

csv1 df

+-----+---------+--------------------------------------------------------+
|id   |dept     |propertiesMap_new                                       |
+-----+---------+--------------------------------------------------------+
|36636|Finance  |{firstname -> Miller, lastname -> kevin, middlename -> }|
|40288|Finance  |{firstname -> aaron, lastname -> sahn, middlename -> }  |
|42114|Sales    |{firstname -> daron, lastname -> ket, middlename -> }   |
|39192|Marketing|{firstname -> , lastname -> , middlename -> }           |
+-----+---------+--------------------------------------------------------+

csv2 df

+-----+-----+------------------------------------------------------+
|id   |dept |propertiesMap_new                                     |
+-----+-----+------------------------------------------------------+
|34534|Sales|{firstname -> dev, lastname -> dis, middlename -> Sam}|
+-----+-----+------------------------------------------------------+

根据动态添加的 Dataframe 的大小,您有两个选项,**1.**要么在追加到现有df之前先读取并存储在列表中。**2.在读取并追加到现有df时循环。无论哪种选项,您都需要一个for循环。在这种情况下,我使用了一个df的列表。在for循环中3.**union existing with new df.**4.**groupby和agg将目标maptype列放入数组中。**5.**利用高阶函数将上面获取的数组转换回map中。见下面的代码;

lst=[df2,df1]
for l in lst:
  df=df.union(l)
df.groupby('id','dept').agg(collect_list('propertiesMap').alias('propertiesMap')).select('id','dept',
        f.expr('aggregate(slice(propertiesMap, 2, size(propertiesMap)), propertiesMap[0], (acc, element) -> map_concat(acc, element))').alias('propertiesMap')
        ).show(truncate=False)


    +-----+---------+-----------------------------------------------------------------------------------------+
    |id   |dept     |propertiesMap                                                                            |
    +-----+---------+-----------------------------------------------------------------------------------------+
    |36636|Finance  |{salary -> 3000, location -> USA, firstname -> Miller, lastname -> kevin, middlename -> }|
    |40288|Finance  |{salary -> 5000, location -> IND, firstname -> aaron, lastname -> sahn, middlename -> }  |
    |42114|Sales    |{salary -> 3900, location -> USA, firstname -> daron, lastname -> ket, middlename -> }   |
    |39192|Marketing|{salary -> 2500, location -> CAN, firstname -> , lastname -> , middlename -> }           |
    |34534|Sales    |{salary -> 6500, location -> USA, firstname -> dev, lastname -> dis, middlename -> Sam}  |
    +-----+---------+-----------------------------------------------------------------------------------------+
hof1towb

hof1towb2#

我想你应该尝试map_concat在pyspark为您的要求

str_dynamic_col = [location, ...]
int_dynamic_col =[salary, ...]

df=df.withColumn('INTpropertiesMap', F.create_map().cast(T.MapType(T.StringType(), T.IntegerType())))
df=df.withColumn('STRpropertiesMap', F.create_map().cast(T.MapType(T.StringType(), T.StringType())))

for i in str_dynamic_col:
     df = df.withColumn('STRpropertiesMap', 
          F.map_concat(
               F.col('STRpropertiesMap'),
               F.create_map( F.lit(i), F.col(i))
          )
     )

for i in int_dynamic_col:
     df = df.withColumn('INTpropertiesMap', 
          F.map_concat(
               F.col('INTpropertiesMap'),
               F.create_map( F.lit(i), F.col(i))
          )
     )

如果你想找多个数据类型的Map,请 checkout 这篇文章https://stackoverflow.com/a/67088231/7224372

相关问题