pandas Dataframe 上的并行迭代

mzsu5hc0  于 2023-04-04  发布在  其他
关注(0)|答案(1)|浏览(156)

我有一个函数,我想通过一组索引来迭代pandas Dataframe 。

import pandas as pd
df = pd.DataFrame({"colA": ["x"]*5+["y"]*5,
                "colB": ["a"]*2+["b"]*2+["c"]*2+["d"]*2+["e"]*2, 
                "colC": range(10)})
def example(df):
    df['colD']=df['colC']*2
    return df
def example_by_group(df, group):
    (n,m) = group
    df_temp = df[df.colA.eq(n) & df.colB.eq(m)]
    df_temp = example(df_temp)
    return df_temp
def my_function(df):
    result = pd.DataFrame()
    for _, group in df[['colA','colB']].drop_duplicates().iterrows():
        temp = example_by_group(df,group)
        result = pd.concat([result,temp])
    return result
my_function(df)

我的真实的案例非常耗时,所以我尝试像这样并行化:

from joblib import Parallel, delayed
def my_parallel_function(df, n_jobs):
    paral_execute = Parallel(n_jobs)
    parallel_function =  delayed(example_by_group)

    execution_list =(parallel_function(df, group) for _ , group in df[['colA','colB']].drop_duplicates().iterrows())

    result_list = paral_execute(execution_list)
    return pd.concat(result_list)
my_parallel_function(df, 2)

然而,我的并行函数比基本函数慢。我的 Dataframe 有大约700k行,分为大约1400组。
出于某种原因,我想避免安装其他库(dask,polars等)。
非常感谢

gev0vcfq

gev0vcfq1#

恕我直言,我认为你的代码效率不高。所以在使用多处理之前,请尝试优化你的顺序代码。我不知道你的真实的代码,但以下代码:

for _, group in df[['colA','colB']].drop_duplicates().iterrows():
...
df_temp = df[df.colA.eq(n) & df.colB.eq(m)]

显然是groupby

df.groupby(['colA', 'colB'])

所以你不需要手动创建数据框的1400个副本,让Pandas为你处理组(它不是视图,但显然不是完整的副本)。
另一点,如果你只计算一列,为什么要返回整个 Dataframe ?返回一个Series
尝试使用multiprocessing模块的简单版本:

import pandas as pd
import multiprocessing as mp

def parallel_function(groupname, subdf):
    print(groupname)
    return subdf['colC'] * 2
    
with mp.Pool(mp.cpu_count()) as pool:
    data = pool.starmap(parallel_function, df.groupby(['colA', 'colB']))
df['colD'] = pd.concat(data)

相关问题