pyspark 从两个完全相同的spark python Dataframe 中选择不匹配的列和值

z9ju0rcb  于 2023-03-01  发布在  Spark
关注(0)|答案(1)|浏览(169)

我想从不同来源的两个完全相同的 Dataframe 中选择不匹配的列及其值。

    • 我现在拥有的:**

| 列_1|键列|第二栏|第三列|第四栏|
| - ------|- ------|- ------|- ------|- ------|
| 项目a|键1|b.人口基金|(c)秘书长的报告|日|
| w|键2|x|Y型|z|
| 列_1|键列|第二栏|第三列|第四栏|
| - ------|- ------|- ------|- ------|- ------|
| 项目a|键1|b.人口基金|p|q|
| w|键2|x|Y型|z|
我有来自不同数据源的相同模式的2个 Dataframe 。

    • 我想要的**

使用"key_col"作为连接键连接(内部连接)2个 Dataframe ,并以以下格式给出输出:
对于联接后获得的表中的每一行,返回以下行:
| 键列|列名不匹配|第一个df中的值不匹配|秒df中的值不匹配|
| - ------|- ------|- ------|- ------|
| 键1|[第3栏、第4栏]|[c、d]|[p,q]|
我正在寻找查询这样做在pyspark。

xqkwcwgp

xqkwcwgp1#

这是可行的:

df1.alias("df1").join(df2.alias("df2"), F.col("df1.key_col")==F.col("df2.key_col"))\
 .select("df1.key_col", *[F.when(F.col("df1."+col) != F.col("df2."+col), F.create_map("df1."+col,"df2."+col)).alias(col) for col in df1.schema.names if col!="key_col"])\
 .withColumn("merged", F.map_concat(*[F.coalesce(F.col(col), F.create_map().cast("map<string,string>")) for col in df1.schema.names if col!="key_col"]))\
 .withColumn("mismatched_column_names", F.array(*[F.when(F.col(col).isNotNull(), F.lit(col)) for col in df1.schema.names if col!="key_col"]))\
 .withColumn("mismatched_column_names", F.expr('filter(mismatched_column_names, x -> x is not null)'))\
 .withColumn("mismatched_values_in_first_df", F.map_keys("merged"))\
 .withColumn("mismatched_values_in_second_df", F.map_values("merged"))\
 .filter(F.size("mismatched_values_in_second_df") != 0)\
 .select("df1.key_col", "mismatched_column_names", "mismatched_values_in_first_df", "mismatched_values_in_second_df")\
 .show(truncate=False)

输入:
DF1:

DF2:

输出:

相关问题