ubuntu Python多处理打开的文件太多

kq0g1dla  于 2023-06-21  发布在  Python
关注(0)|答案(1)|浏览(117)

我在Ubuntu上使用Python 3.8.10,试图在pandas数据框的行上运行一个函数-该函数获取行中的数据并使用它为新的数据框创建更多(确切地说是84)行。输入 Dataframe 具有超过15 M行,因此这是串行运行的缓慢过程。为了并行处理,我使用以下代码:

from multiprocessing import Pool, Process, Queue

def expandDates(row):
    expd = do stuff
    return resExpand.put(expd)

resExpand, results = Queue(), []

procs = []
for row in thisData.itertuples():
    p = Process(target=expandDates, args=[row])
    p.start()
    procs.append(p)
for proc in procs:
    results.append(resExpand.get())
    proc.join()
    proc.close()

这给了我一个操作系统错误-打开的文件太多,所以我想我应该分块处理 Dataframe ,给我这个:

chunkSize = 500
chunks = math.ceil(len(thisData)/chunkSize)
chunkIndices = list(range(0, chunks*chunkSize, chunkSize))
for (indx, (fm, to)) in enumerate(zip(chunkIndices, chunkIndices[1:])):
    # get the chunk
    thisChunk = thisData.iloc[fm:to]
    # process
    procs = []
    for row in thisChunk.itertuples():
        p = Process(target=expandDates, args=[row])
        p.start()
        procs.append(p)
    for proc in procs:
        results.append(resExpand.get())
        proc.join()
        proc.close()

据我所知,这应该将多处理限制在一次500行,这应该远远低于可以打开的文件数量的限制。我错过了什么?我都不知道怎么调试这个。谢谢

ubbxdtey

ubbxdtey1#

你会有一个更好的时间与multiprocessing.Pool;选择一些并行工作线程(或者不选择,让Python查看您拥有的CPU核心数量),并提交您的工作,然后处理结果:

import multiprocessing

def expandDates(row):
    return row, do_stuff(row)

def main():
    with multiprocessing.Pool(8) as pool:
        for row, result in pool.imap(expandDates, this_data.itertuples()):
            print(row, result)

相关问题