我有一个疑问要问,这对我的智力是一个挑战。
我有两个Spark Dataframe 。在df1
中,我有以下结构
| 组Idx|启动|结束|
| - ------|- ------|- ------|
| 一百|2023年1月1日00时00分|2023年1月1日12时00分|
| 一百零一|2023年1月1日12时00分|2023年1月1日13时00分|
| 一百零二|2023年1月1日13时00分|2023年1月1日13时15分|
| 一百零三|2023年1月1日13时15分|2023年1月3日11时00分|
等等。
在df2
中,我有这样一个示例结构:
| 目录|启动|结束|价值|
| - ------|- ------|- ------|- ------|
| A类|2023年1月1日10时00分|2023年1月1日10时50分|一点零九二|
| A类|2023年1月1日10时50分|2023年1月1日18:02:00|三、二|
| A类|2023年1月1日18:02:00|2023年1月2日07时15分|0.1分|
| A类|2023年1月2日07时15分|2023年1月5日04:07:00|0.3分|
| 乙|2023年1月1日07时42分|2023年1月1日08时50分|五、五|
| 乙|2023年1月1日08时50分|2023年1月1日13:02:00|四、一|
| 乙|2023年1月1日13:02:00|2023年1月4日12时10分|0.7分|
等等。
现在的目标是使用在df1
内编码的周期(即start
和end
时间),并计算每个类别的value
的平均值(A,B,...)in df2
,使用该 Dataframe 中的周期作为权重。对于df2
中的每个类别,我希望具有df1
的副本,该副本具有附加列average
,该列包含value
的加权平均值,该加权平均值用于编码在df2
中的该类别。这个权重取决于两个时段的重叠程度。例如,对于索引为101(一月一日00:00 - 12:00)且类别为A的组,值1.092应获得50min/(12 * 60min)的权重,这里的分子为10:50 - 10:00,分母为12:00 - 00:00。忽略df2
中的时段与df1
中的时段不完全重叠,这只是我的示例的问题,意思是,您可以假设它们将重叠。
我在panda中通过df2
和df1
行上的循环实现了这一点:
for categ in categories:
pdf2 = spark_df2.where(f"categ=='{categ}'").toPandas() # take subset of df2 data for this category
pdf2.sort_values("start")
pdf_out = pdf1.copy()
raw = {}
for idx, row_df1 in pdf1.iterrows():
r_df1 = (row_df1["end"]-row_df1["start"]).total_seconds() # duration of df1 period
dfx = pdf2[(pdf2["start"]<=row_df1["end"]) & (pdf2["end"]>row_df1["start"])] # subset of df2 data overlapping with that period in df1
vals = []
weights = []
for jdx, row_df2 in dfx.iterrows():
start = row_df2["start"] if row_df2["start"]>=row_df1["start"] else row_df1["start"] # truncate df2 period at starting point of df1 if df2 start is earlier
end = row_df2["end" ] if row_df2["end" ]<=row_df1["end" ] else row_df1["end" ] # truncate df2 period at end point of df1 if df2 end is later
r_df2 = (end-start).total_seconds() # duration of df2 data
vals .append(row_df2["avlue"])
weights.append(r_df2/r_df1 )
raw[row_df2["groupIdx"]] = 0 if len(vals)==0 else np.average(vals, weights=weights)
pdf_out["average"] = imonGroups["groupIdx"].map(raw)
print(pdf_out)
break ## only first category for now
正如你所看到的,我为df2
中的每个类别复制了一个df1
,并添加了一个新列average
来包含该类别在每个周期的平均值。我敢打赌,即使在Pandas中也有更好的方法来实现这一点。但关键是,对于一个类别来说,它已经非常慢了。因为我在df2
中有数十亿的条目,总共超过4300个类别。
我的问题:有没有更有效的方法来实现这一点,最好使用复杂的SQL语句来利用它的计算优势并避开嵌套循环?
谢谢您的建议!
问候你conni
1条答案
按热度按时间lrpiutwd1#
供参考:jqurious基本上解决了,把两个 Dataframe 做一个join,然后运行他发的命令,我在这里加一下记账:
然后,可以通过
groupby
语句为每个索引构建平均值。再次感谢!