我在Ubuntu上使用Python 3.8.10,试图在pandas数据框的行上运行一个函数-该函数获取行中的数据并使用它为新的数据框创建更多(确切地说是84)行。输入 Dataframe 具有超过15 M行,因此这是串行运行的缓慢过程。为了并行处理,我使用以下代码:
from multiprocessing import Pool, Process, Queue
def expandDates(row):
expd = do stuff
return resExpand.put(expd)
resExpand, results = Queue(), []
procs = []
for row in thisData.itertuples():
p = Process(target=expandDates, args=[row])
p.start()
procs.append(p)
for proc in procs:
results.append(resExpand.get())
proc.join()
proc.close()
这给了我一个操作系统错误-打开的文件太多,所以我想我应该分块处理 Dataframe ,给我这个:
chunkSize = 500
chunks = math.ceil(len(thisData)/chunkSize)
chunkIndices = list(range(0, chunks*chunkSize, chunkSize))
for (indx, (fm, to)) in enumerate(zip(chunkIndices, chunkIndices[1:])):
# get the chunk
thisChunk = thisData.iloc[fm:to]
# process
procs = []
for row in thisChunk.itertuples():
p = Process(target=expandDates, args=[row])
p.start()
procs.append(p)
for proc in procs:
results.append(resExpand.get())
proc.join()
proc.close()
据我所知,这应该将多处理限制在一次500行,这应该远远低于可以打开的文件数量的限制。我错过了什么?我都不知道怎么调试这个。谢谢
1条答案
按热度按时间ubbxdtey1#
你会有一个更好的时间与
multiprocessing.Pool
;选择一些并行工作线程(或者不选择,让Python查看您拥有的CPU核心数量),并提交您的工作,然后处理结果: