numpy Numba和Cython没有提供比常规Python更好的性能

yfwxisqw  于 2023-08-05  发布在  Python
关注(0)|答案(1)|浏览(84)

我有一些Python代码,我一直试图用Numba和Cython来加速。Numba大约慢2倍(一个问题是它不支持带axis参数的np.var,但这不可能是唯一的原因),而Cython的运行时间或多或少完全相同。以下是Numba版本:

import numpy as np
import pandas as pd
from itertools import product
from numba import jit, objmode, prange
import time
import cProfile
import pstats

@jit(nopython=True)
def squeeze(X, n, d):
    sum_values = [[0.0] * n for _ in range(n)]
    for i in range(n):
        for j in range(n):
            sum_values[i][j] = np.sum(X[i, :, j])
    return sum_values

@jit(nopython=True)
def calculate_variance2(X):
    n, d, _ = X.shape
    a = np.sum(X, axis=1)
    b = np.zeros(n)
    for i in range(n):
        b[i] = np.var(a[:, i])
    return np.sum(b)

@jit(nopython=True)
def get_X_beta2(n, d, tau):
    X = np.zeros((n, d + 1, n))
    for k in prange(n):
        for j in range(d):
            X[0, j, k] = V[k, j]

    for k in prange(n):
        X[k, d, k] = -tau[k, 0]

    beta = np.zeros((d, n))
    for k in prange(n):
        for j in prange(d):
            beta[j, k] = V[k, j] / V[0, j]

    return X, beta

@jit(nopython=True)
def update_params(X, beta, alloc_mt, uu):
    j1 = np.random.randint(0, d)
    temp = np.sum(X, axis=1) - X[:, j1, :]

    I = np.argmin(np.dot(temp, beta[j1, :]))
    X[:, j1, :] = 0
    X[I, j1, :] = V[:, j1]
    alloc_mt[uu, :, j1] = 0

    return X, alloc_mt

@jit(nopython=True)
def get_mct_E(V, n, d):
    TAR = 51
    upper = 2000
    target = np.linspace(0, upper, TAR)

    envy_mc = np.zeros(TAR)
    util_mc = np.zeros(TAR)
    alloc_mt = np.zeros((TAR, n, d))

    for uu in prange(TAR):
        num_optims = 0

        tau = np.repeat(target[uu], n).reshape(n, 1)

        X, beta = get_X_beta2(n, d, tau)

        alloc_mt[uu, 0, :] = np.ones((1, d))

        counter = 0
        variance = calculate_variance2(X) / n
 
        while counter < d:
         
            X, alloc_mt = update_params(X, beta, alloc_mt, uu)
          
            variance_temp = calculate_variance2(X) / n
          

            if variance_temp < variance:
                variance = variance_temp
                counter = 0
            else:
                counter += 1
     
        X = X[:, :d, :]

        E = np.array(squeeze(X,n,d))
        envy_mc[uu] = calc_envy(E, n)
        util_mc[uu] = calc_util(E, d)
 
    print(sum(envy_mc))
    return envy_mc, util_mc, alloc_mt

def min_cov_target2(V, n, d):
    envy_mc, util_mc, alloc_mt = get_mct_E(V, n, d)

    envy_mct = np.min(envy_mc)
    min_envy_index = np.where(envy_mc == envy_mct)[0]
    m = np.argmax(util_mc[min_envy_index])
    tt = min_envy_index[m]

    envy_mct = envy_mc[tt]
    alloc_mct = np.reshape(alloc_mt[tt, :, :], (n, d))
    X = np.zeros((n, d, n))
    for k in range(n):
        X[:, :, k] = alloc_mct * np.tile(V[k, :], (n, 1))
    E_mct = np.squeeze(np.sum(X, axis=1))

    print(alloc_mct)

@jit(nopython=True)
def kroneckerproduct(A, B, row_a, col_a, row_b, col_b):
    C = np.zeros((row_a * row_b, col_a * col_b))
    for i in range(row_a):
        for j in range(col_a):
            for k in range(row_b):
                for l in range(col_b):
                    C[i * row_b + k, j * col_b + l] = A[i] * B[k, l]
    return numpy_to_list_of_lists(C)

@jit(nopython=True)
def numpy_to_list_of_lists(arr):
    if arr.ndim == 1:  # Handle 1D array separately
        return [[item for item in arr]]
    else:
        list_of_lists = []
        for i in range(arr.shape[0]):
            row_list = [arr[i, j] for j in range(arr.shape[1])]
            list_of_lists.append(row_list)
        return list_of_lists

@jit(nopython=True)
def find_max_value(arr):
    max_value = arr[0, 0]
    for i in range(arr.shape[0]):
        for j in range(arr.shape[1]):
            if arr[i, j] > max_value:
                max_value = arr[i, j]
    return max_value

@jit(nopython=True)
def calc_envy(E, n):
    A = np.diag(E)
    B = np.transpose(np.ones((1, n)))
    return find_max_value(E - \
    np.transpose(np.array(kroneckerproduct(A,B, A.shape[0],1, B.shape[0], B.shape[1])).reshape(n,n)))

@jit(nopython=True)
def calc_util(E, d):
    A = np.diag(E)
    return sum(A)

# Press the green button in the gutter to run the script.
if __name__ == '__main__':

    V = np.array([
        [121, 94, 141, 142, 63, 97, 101, 97, 41, 103],
        [193, 21, 205, 103, 195, 8, 161, 36, 10, 68],
        [23, 51, 29, 145, 144, 154, 135, 128, 18, 173],
        [159, 95, 169, 25, 167, 162, 68, 6, 143, 6]
    ])
    n = 4
    d = 10
    
    min_cov_target2(V, n, d)

字符串
Cython版本

import numpy as np
cimport numpy as cnp
cimport cython

cnp.import_array()
DTYPE = np.int64
ctypedef cnp.int64_t DTYPE_t

@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
def mct(cnp.ndarray[int, ndim=2] V, int n, int d):
    cdef cnp.int64_t TAR = 51
    cdef cnp.int64_t upper = 2000
    cdef cnp.ndarray[double, ndim=1] target = np.linspace(0, upper, TAR)
    cdef cnp.int64_t envy_mct = 0
    cdef cnp.ndarray[DTYPE_t, ndim=1] min_envy_index
    cdef cnp.int64_t envy_o_index = 0
    cdef cnp.int64_t I = 0
    cdef cnp.ndarray[DTYPE_t, ndim = 3] alloc_mt = np.zeros((TAR, n, d), dtype=np.int64)
    cdef cnp.ndarray[DTYPE_t, ndim = 2] alloc_mt_out = np.zeros((n, d), dtype=np.int64)
    cdef cnp.ndarray[double, ndim=3] X = np.zeros((n, d, n))
    cdef cnp.ndarray[double, ndim=2] E = np.zeros((n, n))
    cdef cnp.ndarray[double, ndim=1] envy_mc = np.zeros(TAR)
    cdef cnp.ndarray[double, ndim=1] util_mc = np.zeros(TAR)
    cdef cnp.ndarray[double, ndim=2] tau
    cdef cnp.ndarray[double, ndim=2] beta = np.zeros((n, d))
    cdef int uu, k, j, counter, j1
    cdef cnp.ndarray[double, ndim=2] temp
    cdef double variance, temp_variance
    
    for uu in range(TAR):
        tau = np.tile(target[uu], (n, 1))

        X = np.zeros((n, d+1, n))
        for k in range(n):
            for j in range(d):
                X[0, j, k] = V[k, j]

        for k in range(n):
            X[k, d, k] = -tau[k]

        beta = np.zeros((n, d))
        for k in range(n):
            for j in range(d):
                beta[k, j] = V[k, j] / V[0, j]

        alloc_mt[uu, 0, :] = np.ones((1,d), dtype=int)

        counter = 0
        variance = np.sum(np.var(np.sum(X, axis=1), axis=0)) / n

        while counter < d:
            j1 = np.random.randint(0, d)
            temp = np.sum(X, axis=1) - X[:, j1, :]
            I = np.argmin(temp.dot(beta[:, j1]))

            X[:, j1, :] = 0
            X[I, j1, :] = V[:, j1]
            alloc_mt[uu, :, j1] = 0
            alloc_mt[uu, I, j1] = 1
            temp_variance = np.sum(np.var(np.sum(X, axis=1), axis=0)) / n
            if temp_variance < variance:
                variance = temp_variance
                counter = 0
            else:
                counter += 1
        
        X = X[:, :d, :]

        E = np.squeeze(np.sum(X, axis=1))
        util_mc[uu] = sum((np.diag(E)));
        envy_mc[uu] = np.max(np.max(E - np.kron(np.diag(E), np.transpose(np.ones((1, n))))))

    envy_mct = np.min(envy_mc)
    min_envy_index = np.where(envy_mc == envy_mct)[0]
    m = np.argmax(util_mc[min_envy_index])
    tt = min_envy_index[m]

    envy_mct = envy_mc[tt]
    alloc_mct_out = np.reshape(alloc_mt[tt, :, :], (n, d))
    return alloc_mct_out


原始的Python

def min_cov_target(V, n, d):
    TAR = 51
    upper = 2000
    target = np.linspace(0, upper, TAR)

    envy_mc = np.zeros(TAR)
    util_mc = np.zeros(TAR)
    alloc_mt = np.zeros((TAR, n, d))

    for uu in range(TAR):
        num_optims = 0
        tau = np.tile(target[uu], (n, 1))

        X = np.zeros((n, d+1, n))
        for k in range(n):
            for j in range(d):
                X[0, j, k] = V[k, j]

        for k in range(n):
            X[k, d, k] = -tau[k]

        beta = np.zeros((n, d))
        for k in range(n):
            for j in range(d):
                beta[k, j] = V[k, j] / V[0, j]

        alloc_mt[uu, 0, :] = np.ones((1,d))

        counter = 0
        variance = np.sum(np.var(np.sum(X, axis=1), axis=0)) / n

        while counter < d:
            num_optims += 1
            j1 = np.random.randint(0, d)
            temp = np.sum(X, axis=1) - X[:, j1, :]
            # M = np.min(temp.dot(beta[:, j1]))
            I = np.argmin(temp.dot(beta[:, j1]))

            X[:, j1, :] = 0
            X[I, j1, :] = V[:, j1]
            alloc_mt[uu, :, j1] = 0
            alloc_mt[uu, I, j1] = 1

            temp_variance = np.sum(np.var(np.sum(X, axis=1), axis=0)) / n
            if temp_variance < variance:
                variance = temp_variance
                counter = 0
            else:
                counter += 1
        #print(num_optims)
        X = X[:, :d, :]

        E = np.squeeze(np.sum(X, axis=1))
        util_mc[uu] = sum((np.diag(E)));
        envy_mc[uu] = np.max(np.max(E - np.kron(np.diag(E), np.transpose(np.ones((1, n))))))

    envy_mct = np.min(envy_mc)
    min_envy_index = np.where(envy_mc == envy_mct)[0]
    m = np.argmax(util_mc[min_envy_index])
    tt = min_envy_index[m]

    envy_mct = envy_mc[tt]
    alloc_mct = np.reshape(alloc_mt[tt, :, :], (n, d))

    print(alloc_mct)


迁移到Numba或Cython没有任何性能改进,这是否令人惊讶?我可以做些什么来提高性能?

wbgh16ku

wbgh16ku1#

Numba不是魔杖。它不会掩盖Numpy API的滥用,你已经在你的性能实验中发现了很多。
我提供了三个例子,其中函数应该被重写以使用向量化的替代方案:
squeeze应该是

def squeeze(X: np.ndarray, n: int) -> np.ndarray:
    return X[:n, :, :n].sum(axis=1)

字符串
find_max_value应该是

def find_max_value(arr: np.ndarray) -> float:
    return arr.max()


calc_util应该是

def calc_util(E: np.ndarray) -> float:
    return E.trace()


您需要类似地重写所有其他代码,以正确使用向量化和Numpy的功能,从而消除循环的需要。

相关问题