Python Pandas多处理选取错误

hlswsv35  于 2023-03-11  发布在  Python
关注(0)|答案(1)|浏览(118)

我在一个文件夹中有多个.csv文件。每个.csv文件都有来自股票列表的交易数据。我想从每个.csv文件中提取特定的数据部分(在本例中是从“BABA”股票代码中提取的),然后将多天的数据部分组合起来。由于全局解释器锁定,使用标准的For循环对150个.csv文件执行此操作需要大约15分钟。
目标:使用多处理加速For循环
问题:使用多处理时,我收到错误:属性错误:无法pickle局部对象“main.locals.compile”底部错误的完整追溯。
下面的代码速度较慢,但使用for循环可以正常工作:

def main():

    import pandas as pd
    import glob
    import numpy as np
    import multiprocessing
    from multiprocessing import Pool

    path = '/Users/DataFiles' #multiple .csv files located here
    file_list = glob.glob(path + '/*.csv')
    stock_list = []

    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file

#=== make some changes to dataframe ===

        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)

#=== append dataframes to list ===

        stock_list.append(df)

#=== using FOR LOOP ===

    for file in file_list:
        compile(file)

#=== combine dataframes from list into one dataframe and export

stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__':
    main()

当我将for循环更改为多处理代码时,收到错误。

def main():

    import pandas as pd
    import glob
    import numpy as np
    import multiprocessing
    from multiprocessing import Pool

    path = '/Users/DataFiles' #multiple .csv files located here
    file_list = glob.glob(path + '/*.csv')
    stock_list = []

    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file

#=== make some changes to dataframe ===

        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)

#=== append dataframes to list ===

        stock_list.append(df)

#=== using MULTIPROCESSING ===

    pool = Pool(processes = (multiprocessing.cpu_count()-1))
    results = pool.map(compile, file_list)
    pool.close()
    pool.join()
    results_df = pd.concat(results)

#=== combine dataframes from list into one dataframe and export

stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__':
    main()

错误追溯: 追溯(最近调用最后调用):文件“/用户/andrewbochat/桌面/选项数据/practicemultiprocessing.py“,第54行,在main()文件“/用户/andrewbochat/桌面/选项数据/practice multiprocessing.py”,第41行,在主结果=pool.map(编译,文件_列表)文件“/用户/andrewbochat/opt/anaconda 3/lib/python3.9/multiprocessing/pool.py“,第364行,在Map中返回self._map_async(函数、可迭代、Map星、块大小).get()文件“/用户/安德鲁波查/opt/anaconda 3/lib/python3.9/多处理/pool.py”,第771行,在获取提升自身中。_value文件“/用户/安德鲁波查/opt/anaconda 3/lib/python3.9/多处理/pool.py”,第537行,在_句柄_任务放置(任务)文件“/用户/安德鲁波查/opt/anaconda 3/lib/python3.9/多处理/ www.example.com ”,第211行,在发送自己。_send_bytes(_ForkingPickler. dump(obj))文件“/用户/andrewbochat/opt/anaconda 3/lib/python3.9/多处理/reduction.py“,第51行,在转储文件cls(buf,protocol).dump(obj)中属性错误:无法pickle局部对象“main.locals.compile”

dly7yett

dly7yett1#

我不确定您是否真的将导入和其他函数嵌套在def main()中-或者这是否是代码格式问题?
您需要将代码格式化为如下格式:

import glob
import pandas as pd
import numpy as np
from   multiprocessing import Pool

def compile(file):
    df = pd.read_csv(file)
    df = df.loc[df['UnderlyingSymbol'] == 'BABA'] 

    def market_delta():
        return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
    df['DMarket'] = market_delta().round(4)

    return df

def main():
    path = '/Users/DataFiles'
    file_list = glob.glob(path + '/*.csv')

    with Pool() as pool:
        results_df = pd.concat(pool.map(compile, file_list))

    print(results_df)
    results_df.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__': 
    main()

相关问题