pyspark 两个Spark Dataframe 中的交叉时间段

zpgglvta  于 2023-03-01  发布在  Spark
关注(0)|答案(1)|浏览(129)

我有一个疑问要问,这对我的智力是一个挑战。
我有两个Spark Dataframe 。在df1中,我有以下结构
| 组Idx|启动|结束|
| - ------|- ------|- ------|
| 一百|2023年1月1日00时00分|2023年1月1日12时00分|
| 一百零一|2023年1月1日12时00分|2023年1月1日13时00分|
| 一百零二|2023年1月1日13时00分|2023年1月1日13时15分|
| 一百零三|2023年1月1日13时15分|2023年1月3日11时00分|
等等。
df2中,我有这样一个示例结构:
| 目录|启动|结束|价值|
| - ------|- ------|- ------|- ------|
| A类|2023年1月1日10时00分|2023年1月1日10时50分|一点零九二|
| A类|2023年1月1日10时50分|2023年1月1日18:02:00|三、二|
| A类|2023年1月1日18:02:00|2023年1月2日07时15分|0.1分|
| A类|2023年1月2日07时15分|2023年1月5日04:07:00|0.3分|
| 乙|2023年1月1日07时42分|2023年1月1日08时50分|五、五|
| 乙|2023年1月1日08时50分|2023年1月1日13:02:00|四、一|
| 乙|2023年1月1日13:02:00|2023年1月4日12时10分|0.7分|
等等。
现在的目标是使用在df1内编码的周期(即startend时间),并计算每个类别的value的平均值(A,B,...)in df2,使用该 Dataframe 中的周期作为权重。对于df2中的每个类别,我希望具有df1的副本,该副本具有附加列average,该列包含value的加权平均值,该加权平均值用于编码在df2中的该类别。这个权重取决于两个时段的重叠程度。例如,对于索引为101(一月一日00:00 - 12:00)且类别为A的组,值1.092应获得50min/(12 * 60min)的权重,这里的分子为10:50 - 10:00,分母为12:00 - 00:00。忽略df2中的时段与df1中的时段不完全重叠,这只是我的示例的问题,意思是,您可以假设它们将重叠。
我在panda中通过df2df1行上的循环实现了这一点:

for categ in categories:
    pdf2 = spark_df2.where(f"categ=='{categ}'").toPandas() # take subset of df2 data for this category
    pdf2.sort_values("start")
    pdf_out = pdf1.copy()
    raw = {}
    for idx, row_df1 in pdf1.iterrows():
        r_df1   = (row_df1["end"]-row_df1["start"]).total_seconds() # duration of df1 period
        dfx     = pdf2[(pdf2["start"]<=row_df1["end"]) & (pdf2["end"]>row_df1["start"])] # subset of df2 data overlapping with that period in df1
        vals    = []
        weights = []
        for jdx, row_df2 in dfx.iterrows():
            start = row_df2["start"] if row_df2["start"]>=row_df1["start"] else row_df1["start"] # truncate df2 period at starting point of df1 if df2 start is earlier
            end   = row_df2["end"  ] if row_df2["end"  ]<=row_df1["end"  ] else row_df1["end"  ] # truncate df2 period at end point of df1 if df2 end is later
            r_df2 = (end-start).total_seconds() # duration of df2 data
            vals   .append(row_df2["avlue"])
            weights.append(r_df2/r_df1     )
        raw[row_df2["groupIdx"]] = 0 if len(vals)==0 else np.average(vals, weights=weights)
    pdf_out["average"] = imonGroups["groupIdx"].map(raw)
    print(pdf_out)
    break ## only first category for now

正如你所看到的,我为df2中的每个类别复制了一个df1,并添加了一个新列average来包含该类别在每个周期的平均值。我敢打赌,即使在Pandas中也有更好的方法来实现这一点。但关键是,对于一个类别来说,它已经非常慢了。因为我在df2中有数十亿的条目,总共超过4300个类别。
我的问题:有没有更有效的方法来实现这一点,最好使用复杂的SQL语句来利用它的计算优势并避开嵌套循环?
谢谢您的建议!
问候你conni

lrpiutwd

lrpiutwd1#

供参考:jqurious基本上解决了,把两个 Dataframe 做一个join,然后运行他发的命令,我在这里加一下记账:

duckdb.sql("""
from df1, df2
select 
   *,
   abs(epoch(least(df1.end, df2.end) - greatest(df1.start, df2.start))::float) as seconds2,
   epoch(df1.end - df1.start)::float as seconds1,
   seconds2 / seconds1 as weight
""")

然后,可以通过groupby语句为每个索引构建平均值。
再次感谢!

相关问题