pyspark 使用时间间隔透视表

w1jd8yoj  于 2022-11-21  发布在  Spark
关注(0)|答案(1)|浏览(132)

所以我有一个这样的pySpark DataFrame:

+------+--------------+---------------+----------+----------+
|A_ID  |B_ID          |C_ID           | BEGIN    |   END    |
+------+--------------+---------------+----------+----------+
| 55264|     12       |         4     |2001-01-01|2012-08-05|
| 54897|     12       |         4     |2001-11-01|2012-11-30|
|  8222|     12       |         5     |2001-08-01|2012-12-31|
| 25001|     12       |         4     |2001-10-01|2015-08-30|
| 40001|     12       |         5     |2001-11-01|2012-12-31|
|  8217|     12       |         5     |2001-05-01|2020-12-31|
| 40002|     12       |         5     |2001-05-01|2009-05-01|
| 25002|     12       |         4     |2001-10-01|2020-05-30|
|  8073|     13       |         3     |2002-05-05|2003-05-04|
...

我希望使用多索引透视DataFrame。我希望有两个垂直ID- A_ID和B_ID。在水平行中应该是周,从最早的开始日期开始。值将基于C_ID,其中0表示在特定周中没有任何C_ID,1表示C_ID在本周中有一些值。2表示A_ID/B_ID行在特定周中有多个唯一的C_ID。如果能有一些关于C_ID的所有过去天数的信息,那就太好了。
最后它可能看起来像这样:

+-----+-----+-----+-----+-----+
|    Weeks  | w1  | w2  | w3  | ....
+-----+-----+
|B_ID | A_ID|
+-----+-----+------------------
| 12  |55264|  0  |  1  |  1  |
|     |82226|  2  |  1  |  0  |
|     |80732|
|     |55264|
|     |40001|
|     |54897|       etc...
| 13  |80732|
|     |32444|
...

我该怎么做?

wlzqhblo

wlzqhblo1#

复制数据

import pyspark.sql.functions as F 
from pyspark.sql import Window

cols = ["A_ID", "B_ID", "C_ID", "BEGIN", "END"]

data = [(55264, 12, 4, "2001-01-01", "2012-08-05"),
    (54897, 12, 4, "2001-11-01", "2012-11-30"),
    (8222,  12, 5, "2001-08-01", "2012-12-31"),
    (40001, 12, 5, "2001-11-01", "2012-12-31"),
    (8217,  12, 5, "2001-05-01", "2020-12-31"),
    (40002, 12, 5, "2001-05-01", "2009-05-01"),
    (25002, 12, 4, "2001-10-01", "2020-05-30"),
    (8073,  13, 3, "2002-05-05", "2003-05-04")]

df_data = (spark.createDataFrame(data, schema=cols)
            .select("A_ID", "B_ID", "C_ID", 
                     F.col("BEGIN").cast("date"),
                     F.col("END").cast("date")))

计算和透视:

1)如果希望列中的所有周都从第一周开始,请创建df_weeks,然后在透视之前与生成的 Dataframe 联接。如果希望列编号,请使用weeks_no在下面的2)中进行groupby和透视:
w = Window.orderBy("week")
df_weeks = (df_data
             .agg(F.min(F.date_trunc("week", "BEGIN")).cast("date").alias("start"), 
                  F.max(F.date_trunc("week", "END")).cast("date").alias("end"))
        .withColumn("week", F.explode(F.expr('sequence(start, end, interval 1 week)')))
        .withColumn("week_no", F.row_number().over(w))
       ).select("week", "week_no")
2)为开始和END之间的每个日期创建行,使用date_trunc截断以获取每个日期的周开始,按周、A_ID、B_ID分组,并对不同的C_ID值进行计数,使用df_dates连接,然后透视空值并使用0填充空值:
df = (df_data
       .withColumn("dates", F.explode(F.expr('sequence(BEGIN, END, interval 1 week)')))
       .withColumn("week", F.date_trunc("week", "dates").cast("date"))
       .join(df_weeks, "week", "right")
        .groupBy("week", "A_ID", "B_ID")
        .agg(F.countDistinct("C_ID").alias("count"))
        .groupBy("A_ID", "B_ID")
        .pivot("week")
        .agg(F.first("count"))
        .fillna(0))

相关问题