PySpark中的高级过滤

iq0todco  于 2023-04-05  发布在  Spark
关注(0)|答案(2)|浏览(190)

目前,我正在对一个大型数据库进行一些计算,该数据库包含各种借款人如何支付贷款的各种信息。从技术Angular 来看,我使用PySpark,并且刚刚面临如何使用高级过滤操作的问题。
例如,我的dataframe看起来像这样:

Name    ID     ContractDate LoanSum Status
Boris   ID3    2022-10-10   10      Closed 
Boris   ID3    2022-10-15   10      Active
Boris   ID3    2022-11-22   15      Active
John    ID1    2022-11-05   30      Active
Martin  ID6    2022-12-10   40      Closed
Martin  ID6    2022-12-12   40      Active
Martin  ID6    2022-07-11   40      Active

我必须创建一个数据框架,其中包含组织向特定借款人(按ID分组)发放的所有贷款,其中两笔贷款(分配给一个唯一ID)之间的天数小于5,并且贷款金额相同。
换句话说,我必须获得下表:

Name    ID     ContractDate LoanSum Status
Boris   ID3    2022-10-10   10      Closed 
Boris   ID3    2022-10-15   10      Active
Martin  ID6    2022-12-10   40      Closed
Martin  ID6    2022-12-12   40      Active

要运行此筛选,我应该做些什么?
先谢谢你了

5n0oy7gb

5n0oy7gb1#

您的DataFrame(df):

+------+---+------------+-------+------+
|  Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3|  2022-10-10|     10|Closed|
| Boris|ID3|  2022-10-15|     10|Active|
| Boris|ID3|  2022-11-22|     15|Active|
|  John|ID1|  2022-11-05|     30|Active|
|Martin|ID6|  2022-12-10|     40|Closed|
|Martin|ID6|  2022-12-12|     40|Active|
|Martin|ID6|  2022-07-11|     40|Active|
+------+---+------------+-------+------+

导入必要的包:

from pyspark.sql.window import Window
from pyspark.sql.functions import datediff, lag, array, col, explode

试试这个

win_spec = Window.partitionBy("ID").orderBy("ID", "ContractDate")

df_2 = df_1 \
    .withColumn("PrevDate", lag("ContractDate",1,0).over(win_spec)) \
    .withColumn("DateGap", datediff("ContractDate","PrevDate"))

df_3 = df_2.filter("DateGap <= 5") \
    .withColumn("ContractDate", explode(array(col("ContractDate"), col("PrevDate")))) \
    .select("Name","ID","ContractDate")

df_final = df_3 \
    .join(df_2, ["Name","ID","ContractDate"]) \
    .drop("PrevDate", "DateGap")

df_final.show()

输出

+------+---+------------+-------+------+
|  Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3|  2022-10-10|     10|Closed|
| Boris|ID3|  2022-10-15|     10|Active|
|Martin|ID6|  2022-12-10|     40|Closed|
|Martin|ID6|  2022-12-12|     40|Active|
+------+---+------------+-------+------+
q3aa0525

q3aa05252#

代码

# Create a window spec
w = Window.partitionBy('Name', 'ID', 'LoanSum').orderBy('ContractDate')

# Calculate forward(x) and backward(y) differences
x = F.datediff('ContractDate', F.lag('ContractDate').over(w))
y = F.datediff('ContractDate', F.lag('ContractDate', -1).over(w))

# Boolean condition to filter the rows where the number 
# of days between two loans is less than or equal to 5 days
mask = (F.abs(x) <= 5) | (F.abs(y) <= 5)

# Make sure to convert ContractDate to date type
result = df.withColumn('ContractDate', F.to_date('ContractDate'))

# filter the rows using the boolean mask
result = result.select('*', mask.alias('mask')).filter('mask').drop('mask')

结果

+------+---+------------+-------+------+
|  Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3|  2022-10-10|     10|Closed|
| Boris|ID3|  2022-10-15|     10|Active|
|Martin|ID6|  2022-12-10|     40|Closed|
|Martin|ID6|  2022-12-12|     40|Active|
+------+---+------------+-------+------+

相关问题