将一个pyspark Dataframe 的值替换为另一个

vpfxa7rd  于 2022-12-17  发布在  Spark
关注(0)|答案(2)|浏览(113)

我有一个pyspark Dataframe df2:-
| 识别号|总计_计数|最终_A|最终_B|最终_C|最终_D|
| - ------|- ------|- ------|- ------|- ------|- ------|
| 十一|八十个|三十六|三十|八个|六个|
| 四个|八十个|三十六|三十|八个|六个|
| 十三|六十五|三十|二十四|六个|五个|
| 十二|五十六|二十六|二十一|五个|四个|
| 第二章|六十五|三十|二十四|六个|五个|
| 1个|五十六|二十六|二十一|五个|四个|
我有另一个 Dataframe df1:-
| 识别号|总计_计数|A类|B| C级|D级|
| - ------|- ------|- ------|- ------|- ------|- ------|
| 四个|八十个|无|无|三个|无|
| 十一|八十个|无|无|无|无|
| 十三|六十五|无|无|无|无|
| 十二|五十六|无|四个|无|无|
| 第二章|六十五|无|无|无|无|
| 1个|五十六|无|无|无|无|
| 十个|三十四|十个|十个|十个|四个|
我想用df2替换ID(主键)的值。应为df1:-
| 识别号|总计_计数|A类|B| C级|D级|
| - ------|- ------|- ------|- ------|- ------|- ------|
| 十一|八十个|三十六|三十|八个|六个|
| 四个|八十个|三十六|三十|八个|六个|
| 十三|六十五|三十|二十四|六个|五个|
| 十二|五十六|二十六|二十一|五个|四个|
| 第二章|六十五|三十|二十四|六个|五个|
| 1个|五十六|二十六|二十一|五个|四个|
| 十个|三十四|十个|十个|十个|四个|

pb3skfrl

pb3skfrl1#

df2=spark.read.option("header","True").option("inferSchema","True").csv("df1.csv")
df1=spark.read.option("header","True").option("inferSchema","True").csv("df2.csv")

df2 = df2.withColumnRenamed("ID",'df2_ID').withColumnRenamed("Total_Count",'df2_Total_Count')

final_df = df1.join(df2,(df1.ID ==  df2.df2_ID) & (df1.Total_Count ==  df2.df2_Total_Count),"left")

from pyspark.sql.functions import when
for i in ('A','B','C','D'):
  final_df = final_df.withColumn(i, when(final_df[i] == 0, final_df["Final_{}".format(i)]).otherwise(final_df[i]))

cols = df2.columns
final_df = final_df.drop(*cols)
ajsxfq5m

ajsxfq5m2#

df = df1.join(df2.select('Final_A', 'Final_B', 'Final_C', 'Final_D'), 'ID'], 'left')
df =df.withColumn('A', coalesce(df['Final_A'],df['A'])).\
       withColumn('B', coalesce(df['Final_B'],df['B'])).\
       withColumn('C', coalesce(df['Final_C'],df['C'])).\
       withColumn('D', coalesce(df['Final_D'],df['D']))

df1 = df.select('ID', 'Total_Count','A', 'B', 'C', 'D')

df1.show()

相关问题