我在一个文件夹中有多个.csv文件。每个.csv文件都有来自股票列表的交易数据。我想从每个.csv文件中提取特定的数据部分(在本例中是从“BABA”股票代码中提取的),然后将多天的数据部分组合起来。由于全局解释器锁定,使用标准的For循环对150个.csv文件执行此操作需要大约15分钟。
目标:使用多处理加速For循环
问题:使用多处理时,我收到错误:属性错误:无法pickle局部对象“main.locals.compile”底部错误的完整追溯。
下面的代码速度较慢,但使用for循环可以正常工作:
def main():
import pandas as pd
import glob
import numpy as np
import multiprocessing
from multiprocessing import Pool
path = '/Users/DataFiles' #multiple .csv files located here
file_list = glob.glob(path + '/*.csv')
stock_list = []
def compile(file):
df = pd.read_csv(file)
df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file
#=== make some changes to dataframe ===
def market_delta():
return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
df['DMarket'] = market_delta().round(4)
#=== append dataframes to list ===
stock_list.append(df)
#=== using FOR LOOP ===
for file in file_list:
compile(file)
#=== combine dataframes from list into one dataframe and export
stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)
if __name__ == '__main__':
main()
当我将for循环更改为多处理代码时,收到错误。
def main():
import pandas as pd
import glob
import numpy as np
import multiprocessing
from multiprocessing import Pool
path = '/Users/DataFiles' #multiple .csv files located here
file_list = glob.glob(path + '/*.csv')
stock_list = []
def compile(file):
df = pd.read_csv(file)
df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file
#=== make some changes to dataframe ===
def market_delta():
return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
df['DMarket'] = market_delta().round(4)
#=== append dataframes to list ===
stock_list.append(df)
#=== using MULTIPROCESSING ===
pool = Pool(processes = (multiprocessing.cpu_count()-1))
results = pool.map(compile, file_list)
pool.close()
pool.join()
results_df = pd.concat(results)
#=== combine dataframes from list into one dataframe and export
stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)
if __name__ == '__main__':
main()
错误追溯: 追溯(最近调用最后调用):文件“/用户/andrewbochat/桌面/选项数据/practicemultiprocessing.py“,第54行,在main()文件“/用户/andrewbochat/桌面/选项数据/practice multiprocessing.py”,第41行,在主结果=pool.map(编译,文件_列表)文件“/用户/andrewbochat/opt/anaconda 3/lib/python3.9/multiprocessing/pool.py“,第364行,在Map中返回self._map_async(函数、可迭代、Map星、块大小).get()文件“/用户/安德鲁波查/opt/anaconda 3/lib/python3.9/多处理/pool.py”,第771行,在获取提升自身中。_value文件“/用户/安德鲁波查/opt/anaconda 3/lib/python3.9/多处理/pool.py”,第537行,在_句柄_任务放置(任务)文件“/用户/安德鲁波查/opt/anaconda 3/lib/python3.9/多处理/ www.example.com ”,第211行,在发送自己。_send_bytes(_ForkingPickler. dump(obj))文件“/用户/andrewbochat/opt/anaconda 3/lib/python3.9/多处理/reduction.py“,第51行,在转储文件cls(buf,protocol).dump(obj)中属性错误:无法pickle局部对象“main.locals.compile”
1条答案
按热度按时间dly7yett1#
我不确定您是否真的将导入和其他函数嵌套在
def main()
中-或者这是否是代码格式问题?您需要将代码格式化为如下格式: