csv Python中的多处理池不使用全部CPU

pieyvz9o  于 2023-03-27  发布在  Python
关注(0)|答案(1)|浏览(109)

我正在使用Python处理一个大的CSV文件(超过1000万行),在每行上运行一个计算操作,然后将结果保存在一个新的CSV文件中。我试图在一个4个进程的池中运行该程序(与我的服务器上的CPU核心数量相同),在前15分钟内它工作正常,保持在100%,但在执行的提醒下,它下降到60%。
下面的图表显示了这个过程。每个输入文件大约运行70分钟,但它只在15分钟内达到100%的CPU使用率峰值。我可以做些什么来加快这个过程并保持它在100%?

代码如下。资源重函数是doBigCalculation(计算并保存到另一个CSV),所以我在将它们发送到Pool进行处理之前将2000个请求保存在列表中。

p = Pool(cpu_count())

with open('input.csv', newline='') as inputFile:
    reader = csv.reader(inputFile, delimiter=',', quotechar='"')

    for row in reader:

        param1 = float(row[0])
        param2 = float(row[1])

        # This is the resource heavy task
        aqiRequest = AQIRequest(param1, param2)

        aqiRequests.append(aqiRequest)
        if(len(aqiRequests) % 2000 == 0):
            p.map(doBigCalculation, aqiRequests)
            aqiRequests = []
knpiaxh1

knpiaxh11#

在执行这样的任务时,通常会遇到一些跟踪、错误和性能分析。但第一个建议是不要自己对请求进行分块。map具有内置的分块功能,可以更有效地完成此任务,您可以使用chunksize=参数对其进行调优。此外,使用imap可以将读取重构到生成器中。它与imap组合将延迟读取csv,这将减少7000万行的RAM压力。

from multiprocessing import Pool, cpu_count

def aqi_request_generator():
    with open('input.csv', newline='') as inputFile:
        reader = csv.reader(inputFile, delimiter=',', quotechar='"')

        for row in reader:

            param1 = float(row[0])
            param2 = float(row[1])

            # This is the resource heavy task
            yield AQIRequest(param1, param2)

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize

pool_size = cpu_count()
p = Pool(pool_size)
for result in p.imap(doBigCalculation,
                     aqi_request_generator(),
                     chunksize=compute_chunksize(10_000_000, pool_size)
                     ):
    # Do something with return value from doBugCalculation:
    ...

此外,如果有一些IO(您提到写入输出csv),您可能会发现通过拥有比CPU更多的进程来获得更好的性能,因为您可能会花费大量时间来写入结果。

相关问题