使用Python多重处理的nvtx标记

brccelvz  于 2022-12-10  发布在  Python
关注(0)|答案(1)|浏览(270)

我尝试在Python中使用nvtx标记沿着多处理池,但是当只有一个子进程调用一个带注解的函数时,该操作不会出现在性能分析报告中。有什么方法可以解决这个问题吗?或者这是Python进程的一个限制?下面是一些可以复制的示例代码:

import os
import time
from multiprocessing import Pool, shared_memory

import numpy as np
import nvtx

N_SAMPLES = int(1e6)
SIGNAL = np.random.randn(N_SAMPLES) + 1j * np.random.randn(N_SAMPLES)

@nvtx.annotate(color="red")
def create_shm_array(signal):
    # Store the signal in shared memory to share across processes
    shm = shared_memory.SharedMemory(create=True, size=signal.nbytes)
    shared_array = np.ndarray(signal.shape, dtype=signal.dtype, buffer=shm.buf)
    shared_array[:] = signal[:]
    return shm

def worker(shm_name):
    shm = shared_memory.SharedMemory(name=shm_name)
    sig = np.ndarray((N_SAMPLES,), dtype=complex, buffer=shm.buf)
    return expensive_op(sig)

@nvtx.annotate(color="blue")
def expensive_op(sig):
    time.sleep(2)
    return np.sum(sig)

def clean_shm(shm_name):
    shm = shared_memory.SharedMemory(name=shm_name)
    shm.close()
    shm.unlink()

if __name__ == "__main__":

    print(f"Total num_bytes: {SIGNAL.nbytes} B | {SIGNAL.nbytes / 1e9} GB")
    test = np.random.randn(10)
    expensive_op(test)
    shared_mem = create_shm_array(SIGNAL)

    with Pool(os.cpu_count()) as p:
        p.map(worker, [shared_mem.name] * 2)
    clean_shm(shared_mem.name)

这是Nvidia Nsight系统的时间线。Marker在父进程第一次调用时出现,但在被子进程

调用时不出现

eqfvzcg8

eqfvzcg81#

默认情况下,python多处理会派生新的进程。我们需要它来派生它们。工作代码如下。

import os
import time
from multiprocessing import Pool, shared_memory, get_context

import numpy as np
import nvtx

N_SAMPLES = int(1e6)
SIGNAL = np.random.randn(N_SAMPLES) + 1j * np.random.randn(N_SAMPLES)

@nvtx.annotate(color="red")
def create_shm_array(signal):
    # Store the signal in shared memory to share across processes
    shm = shared_memory.SharedMemory(create=True, size=signal.nbytes)
    shared_array = np.ndarray(signal.shape, dtype=signal.dtype, buffer=shm.buf)
    shared_array[:] = signal[:]
    return shm

def worker(shm_name):
    shm = shared_memory.SharedMemory(name=shm_name)
    sig = np.ndarray((N_SAMPLES,), dtype=complex, buffer=shm.buf)
    return expensive_op(sig)

@nvtx.annotate(color="blue")
def expensive_op(sig):
    time.sleep(2)
    return np.sum(sig)

def clean_shm(shm_name):
    shm = shared_memory.SharedMemory(name=shm_name)
    shm.close()
    shm.unlink()

if __name__ == "__main__":

    print(f"Total num_bytes: {SIGNAL.nbytes} B | {SIGNAL.nbytes / 1e9} GB")
    test = np.random.randn(10)
    expensive_op(test)
    shared_mem = create_shm_array(SIGNAL)

    with get_context("spawn").Pool(os.cpu_count()) as p:
        p.map(worker, [shared_mem.name] * 2)
    clean_shm(shared_mem.name)

相关问题