Pyspark:比较不同 Dataframe 的列值

cyvaqqii  于 2023-03-17  发布在  Spark
关注(0)|答案(1)|浏览(164)

我们计划执行以下操作:比较两个 Dataframe ,根据比较结果将值添加到第一个 Dataframe 中,然后进行分组以获得组合数据。
我们使用的是pyspark Dataframe ,以下是我们的 Dataframe 。
Dataframe 1:

| Manager    | Department          |  isHospRelated
| --------   | --------------      | --------------
| Abhinav    | Medical             |  t
| Abhinav    | Surgery             |  t
| Rajesh     | Payments            |  t
| Sonu       | Onboarding          |  t
| Sonu       | Surgery             |  t

Dataframe 2:

| OrgSupported| OrgNonSupported          |
| --------   | --------------            |
| Medical    | Payment                   |
| Surgery    | Onboarding                |

我们计划比较Dataframe1和Dataframe2,并获得以下结果:

| Manager    | Department          | Org Supported | Org NotSupported
| --------   | --------------      | ------------- | ----------------
| Abhinav    | Medical             | Medical       |        
| Abhinav    | Surgery             | Surgery       |
| Rajesh     | Payments            |               | Payments
| Sonu       | Onboarding          |               | Onboarding
| Sonu       | Surgery             | Surgery       |

最后,我们将它们分组如下:

| Manager    | Department         | isHospRelated  | Org Supported  | Org NotSupported
| --------   | --------------     | ------------   | -------------  | ----------------
| Abhinav    | Medical,Surgery    | t              | Medical,Surgery|        
| Rajesh     | Payments           | t              |                | Payments
| Sonu       | Onboarding,Surgery | t              |  Surgery       | Onboarding

我们在我们的代码中使用了pyspark,有什么建议吗,我们如何在pyspark中进行这种比较。

qhhrdooz

qhhrdooz1#

我有一个解决方案,我加入df2两次,第一次是获得不支持,第二次是获得支持。完成后,很容易按管理器分组并收集值列表
你可以试试这样的方法:

import pyspark.sql.functions as F

df = [
    {"Manager": "Abhinav", "Department": "Medical", "isHospRelated": "t"},
    {"Manager": "Abhinav", "Department": "Surgery", "isHospRelated": "t"},
    {"Manager": "Rajesh", "Department": "Payments", "isHospRelated": "t"},
    {"Manager": "Sonu", "Department": "Onboarding", "isHospRelated": "t"},
    {"Manager": "Sonu", "Department": "Surgery", "isHospRelated": "t"},
]

df2 = [
    {"OrgSupported": "Medical", "OrgNonSupported": "Payments"},
    {"OrgSupported": "Surgery", "OrgNonSupported": "Onboarding"},
]

df = spark.createDataFrame(df)
df2 = spark.createDataFrame(df2)

dfWithNonSupported = df.join(
    df2.drop("OrgNonSupported"), df.Department == df2.OrgSupported, "left"
)
dfWithSupportedAndNonSupported = dfWithNonSupported.join(
    df2.drop("OrgSupported"),
    dfWithNonSupported.Department == df2.OrgNonSupported,
    "left",
)

dfWithSupportedAndNonSupported.groupBy("Manager").agg(
    F.collect_list("Department").alias("Department"),
    F.collect_list("OrgSupported").alias("OrgSupported"),
    F.collect_list("OrgNonSupported").alias("OrgNonSupported"),
    F.first("isHospRelated").alias("isHospRelated")
).show()

输出:

+-------+--------------------+------------------+---------------+-------------+
|Manager|          Department|      OrgSupported|OrgNonSupported|isHospRelated|
+-------+--------------------+------------------+---------------+-------------+
|Abhinav|  [Medical, Surgery]|[Medical, Surgery]|             []|            t|
| Rajesh|          [Payments]|                []|     [Payments]|            t|
|   Sonu|[Onboarding, Surg...|         [Surgery]|   [Onboarding]|            t|
+-------+--------------------+------------------+---------------+-------------+

相关问题