我有一个函数,我想通过一组索引来迭代pandas Dataframe 。
import pandas as pd
df = pd.DataFrame({"colA": ["x"]*5+["y"]*5,
"colB": ["a"]*2+["b"]*2+["c"]*2+["d"]*2+["e"]*2,
"colC": range(10)})
def example(df):
df['colD']=df['colC']*2
return df
def example_by_group(df, group):
(n,m) = group
df_temp = df[df.colA.eq(n) & df.colB.eq(m)]
df_temp = example(df_temp)
return df_temp
def my_function(df):
result = pd.DataFrame()
for _, group in df[['colA','colB']].drop_duplicates().iterrows():
temp = example_by_group(df,group)
result = pd.concat([result,temp])
return result
my_function(df)
我的真实的案例非常耗时,所以我尝试像这样并行化:
from joblib import Parallel, delayed
def my_parallel_function(df, n_jobs):
paral_execute = Parallel(n_jobs)
parallel_function = delayed(example_by_group)
execution_list =(parallel_function(df, group) for _ , group in df[['colA','colB']].drop_duplicates().iterrows())
result_list = paral_execute(execution_list)
return pd.concat(result_list)
my_parallel_function(df, 2)
然而,我的并行函数比基本函数慢。我的 Dataframe 有大约700k行,分为大约1400组。
出于某种原因,我想避免安装其他库(dask,polars等)。
非常感谢
1条答案
按热度按时间gev0vcfq1#
恕我直言,我认为你的代码效率不高。所以在使用多处理之前,请尝试优化你的顺序代码。我不知道你的真实的代码,但以下代码:
显然是
groupby
:所以你不需要手动创建数据框的1400个副本,让Pandas为你处理组(它不是视图,但显然不是完整的副本)。
另一点,如果你只计算一列,为什么要返回整个 Dataframe ?返回一个
Series
。尝试使用
multiprocessing
模块的简单版本: