从另一个dataframe向pyspark dataframe添加列

umuewwlo  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(560)

我有这个:

df_e :=     
|country, name, year, c2, c3, c4|       
|Austria, Jon Doe, 2003, 21.234, 54.234, 345.434|       
...

df_p :=     
|name, 2001, 2002, 2003, 2004|       
|Jon Doe, 2849234, 12384312, 123908234, 12398193|       
...

两个pysparkDataframe都从csv文件读取。
如何在dfu e中创建一个名为“amount”的新列,从dfu e中引用每个记录的名称和年份值,并从dfu p中获取相应的金额?使用Pypark。
在这种情况下,我应该得到以下dataframe:

df_e :=     
|country, name, year, c2, c3, c4, amount|       
|Austria, Jon Doe, 2003, 21.234, 54.234, 345.434, 123908234|       
...

谢谢你的帮助!
编辑:
我就是这样读文件的:

from pyspark import SparkContext, SparkConf       
from pyspark.sql import SparkSession       

sc = SparkContext.getOrCreate(SparkConf().setMaster('local[*]'))       
spark = SparkSession.builder.getOrCreate()       

df_e = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/e.csv')       
df_p = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/p.csv')

我是从pyspark开始的,所以我不知道我能用什么函数来解决这个问题。
对于pandas,我将通过遍历dataframe来实现,如下所示:

for i in df_e.index:       
    p[i] = df_p.query('name == "{}"'.format(df_e['name'][i]))['{}'.format(df_e['year'][i])]

然后将列表p作为一个新的列添加到dfu e中(尽管我知道可能有更好的方法)。

flseospp

flseospp1#

import pyspark.sql.functions as F

### i am assumming all the columns are years in this except the first one

### you can manually specify the list also   ['2003','2005']  etc ..

columns_to_transpose=df_p .columns[1:] 
k=[]
for x in columns_to_pivot:
    k.append(F.struct(F.lit(f'{x}').alias('year'),F.col(x).alias('year_value')))
df_p_new=df_p.withColumn('New',F.explode(F.array(k))).select([F.col('Name').alias('JOIN_NAME'),F.col('New')['YEAR'].alias('NEW_YEAR'),F.col('New')['year_value'].alias('YEAR_VALUE')])

>>> df_p_new.show()
+---------+--------+----------+
|JOIN_NAME|NEW_YEAR|YEAR_VALUE|
+---------+--------+----------+
|John Doe |    2001|   2849234|
|John Doe |    2002|  12384312|
|John Doe |    2003| 123908234|
|John Doe |    2004|  12398193|
+---------+--------+----------+

## Column Names are case sensitive

df_answer=df_e.join(df_p_new,(df_p_new.JOIN_NAME==df_e.name) & (df_p_new.NEW_YEAR==df_e.year), how='left').select(*df_e.columns,'YEAR_VALUE')
df_answer.show()

+-------+--------+----+------+------+-------+----------+
|country|    name|year|    c2|    c3|     c4|YEAR_VALUE|
+-------+--------+----+------+------+-------+----------+
|Austria|John Doe|2003|21.234|54.234|345.434| 123908234|
+-------+--------+----+------+------+-------+----------+

df_answer.select([*df_e.columns,'YEAR_VALUE'])

    ## you can use the alias  to  rename the columns

相关问题