python 使用多重处理,具有最大同时进程数的进程

pgvzfuti  于 2023-01-19  发布在  Python
关注(0)|答案(4)|浏览(131)

我有Python代码:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    for i in range(0, MAX_PROCESSES):
        p = Process(target=f, args=(i,))
        p.start()

运行良好。但是,MAX_PROCESSES是变量,可以是1512之间的任何值。由于我只在具有8内核的计算机上运行此代码,因此我需要了解是否可以限制允许同时运行的进程数。我研究了multiprocessing.Queue,但它看起来不像我所需要的-或者也许我对文档的解释不正确。

是否有办法限制同时运行的multiprocessing.Process的数量?

bakd9h0s

bakd9h0s1#

最明智的做法可能是使用multiprocessing.Pool,它根据系统上可用的最大内核数生成一个工作进程池,然后在内核可用时提供任务。
标准文档(http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers)中的示例显示,您也可以手动设置内核数量:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

另外,如果代码中需要,还可以使用multiprocessing.cpu_count()方法来计算给定系统上的内核数量,这一点也很方便。
编辑:下面是一些看起来适合您的特定情况的代码草案:

import multiprocessing

def f(name):
    print 'hello', name

if __name__ == '__main__':
    pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
    for i in xrange(0, 512):
        pool.apply_async(f, args=(i,))
    pool.close()
    pool.join()
pw9qyyiw

pw9qyyiw2#

我认为信号量是你要找的,它会阻止主进程后,倒计时到0.示例代码:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # simulate a time-consuming task by sleeping
    time.sleep(5)
    # `release` will add 1 to `sema`, allowing other 
    # processes blocked on it to continue
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        # once 20 processes are running, the following `acquire` call
        # will block the main process since `sema` has been reduced
        # to 0. This loop will continue only after one or more 
        # previously created processes complete.
        sema.acquire()
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

下面的代码结构化程度更高,因为它在同一个函数中获取和释放sema。但是,如果total_task_num非常大,它将消耗太多资源:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # `sema` is acquired and released in the same
    # block of code here, making code more readable,
    # but may lead to problem.
    sema.acquire()
    time.sleep(5)
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        # the following line won't block after 20 processes
        # have been created and running, instead it will carry 
        # on until all 1000 processes are created.
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

上面的代码将创建total_task_num进程,但只有concurrency进程在运行,而其他进程被阻塞,从而消耗了宝贵的系统资源。

qjp7pelc

qjp7pelc3#

更一般地,这也可以看起来像这样:

import multiprocessing
def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

numberOfThreads = 4

if __name__ == '__main__':
    jobs = []
    for i, param in enumerate(params):
        p = multiprocessing.Process(target=f, args=(i,param))
        jobs.append(p)
    for i in chunks(jobs,numberOfThreads):
        for j in i:
            j.start()
        for j in i:
            j.join()

当然,这种方法是相当残忍的(因为它等待垃圾中的每个进程,直到它继续下一个块),但它仍然可以在函数调用的运行时间大致相等的情况下工作得很好。

vfh0ocws

vfh0ocws4#

您可以使用concurrent.futures来完成ProcessPoolExecutor的工作。ProcessPoolExecutor使用multiprocessing中的ProcessSemaphore,这与这里的其他一些答案非常相似。如果你想要here,请查看它。我之所以加上这个答案,是因为到目前为止,它是唯一一个使用更新的API来实现相同功能的示例。

from concurrent.futures import ProcessPoolExecutor,Future,wait
import typing as T

MAX_WORKERS: int = 4
INPUT_SIZE: int = 512

def f(x: int) -> int:
    return x**2

input_vec: T.List[int] = range(INPUT_SIZE)

thread_pool: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=MAX_WORKERS)

threads: T.List[Future] = []

print(f'Spreading {INPUT_SIZE} tasks over {MAX_WORKERS} threads...')
for x in input_vec:
    # ProcessPoolExecutor.submit(callable,*args_to_callable) returns Future
    threads.append(thread_pool.submit(f,x))

# wait for threads to complete (all Futures terminal state)
wait(threads)
print('All tasks complete!')

output_vec: T.List[int] = [thread.result() for thread in threads]

相关问题