python list. sum(l)/len(l)与numpy.mean(l)的平均性能

ih99xse1  于 11个月前  发布在  Python
关注(0)|答案(4)|浏览(89)

我一直在讨论在python中计算列表平均值的最佳方法是什么。虽然我认为numpy已经优化了,但我的结果表明,在这方面,你不应该使用numpy。我想知道python为什么以及如何实现这种性能。
所以基本上我试图弄清楚为什么原生python比numpy快。
我的测试代码:

import random
import numpy as np
import timeit

def average_native(l):
    return sum(l)/len(l)

def average_np(l):
    return np.mean(l)

def test_time(func, arg):
    starttime = timeit.default_timer()
    for _ in range(500):
        func(arg)
    return (timeit.default_timer() - starttime) / 500

for i in range(1, 7):
    numbers = []
    for _ in range(10**i):
        numbers.append(random.randint(0, 100))
    print("for " + str(10**i) + " numbers:")
    print(test_time(average_native, numbers))
    print(test_time(average_np, numbers))

字符串
结果:

for 10 numbers:
2.489999999999992e-07
8.465800000000023e-06
for 100 numbers:
8.554000000000061e-07
1.3220000000000009e-05
for 1000 numbers:
7.2817999999999495e-06
6.22666e-05
for 10000 numbers:
6.750499999999993e-05
0.0005553966000000001
for 100000 numbers:
0.0006954238
0.005352444999999999
for 1000000 numbers:
0.007034196399999999
0.0568878216


顺便说一句,我在c++中运行相同的代码,并惊讶地看到Python代码更快。测试代码:

#include <iostream>
#include <cstdlib>
#include <vector>
#include <chrono>

float calculate_average(std::vector<int> vec_of_num)
{
    double sum=0;
    uint64_t cnt=0;
    for(auto & elem : vec_of_num)
    {
        cnt++;
        sum = sum + elem;     
    }
    return sum / cnt;
}
int main()
{
    // This program will create same sequence of
    // random numbers on every program run
    std::vector<int> vec;
    for(int i = 0; i < 1000000; i++)
       vec.push_back(rand());
    auto start = std::chrono::high_resolution_clock::now();
    for(int i = 0; i < 500; i++)
       calculate_average(vec);
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> float_ms = end - start;

    std::cout << "calculate_average() elapsed time is " << float_ms.count()/500 << " ms )" << std::endl;
    return 0;
}


结果:

calculate_average() elapsed time is 11.2082 ms )


我错过了什么吗?

编辑:我运行的c++代码在一个在线编译器(probebly没有任何优化).它也不是相同的硬件,和怎么知道是怎么回事,它的服务器.运行和编译后的代码在我的设备的代码是快得多.
编辑2:因此,我在numpy函数中更改了numpy数组的代码,我们确实看到对于较小的数组/列表,原生python更好,但是在大约1000个值之后,numpy执行得更好。我真的不明白为什么。numpy有哪些优化产生了这些结果?

新成果:

for 10 numbers:
2.4540000000000674e-07
6.722200000000012e-06
for 100 numbers:
8.497999999999562e-07
6.583400000000017e-06
for 1000 numbers:
6.990799999999964e-06
7.916000000000034e-06
for 10000 numbers:
6.61604e-05
1.5475799999999985e-05
for 100000 numbers:
0.0006671193999999999
8.412259999999994e-05
for 1000000 numbers:
0.0068192092
0.0008199298000000005


也许我需要重新开始这个问题:)

jtjikinw

jtjikinw1#

C比它需要的要慢得多。
首先,对于C
代码,您正在复制vector,这可能是花费大部分时间的事情。您想要编写:

float calculate_average(const std::vector<int>& vec_of_num)

字符串
而不是

float calculate_average(std::vector<int> vec_of_num)


为了避免复制。
第二,确保你在编译时开启了优化。
对于numpy版本,您正在进行额外的转换,这会减慢您的速度。
From the docs

a: array_like
Array containing numbers whose mean is desired. If a is not an array, a conversion is attempted.


因此,无论传递给numpy.mean的是什么,都会首先转换为numpy.array,然后计算均值。制作Numpy数组可能会占用您很大一部分时间。
我建议再做两个基准测试,看看它们与你已经拥有的相比如何:
(1)C++版本没有复制,正如我上面所描述的,并确保优化是打开的。(2)Numpy版本,你传入一个numpy数组而不是Python列表。

9rygscc1

9rygscc12#

函数numpy.mean()所做的比sum()len()所做的要多得多,这就是为什么它如此“慢”。
np.mean()中包含的功能本质上是它使其成为ufunc的原因,特别是对n维数组的支持。
然而,导致简单实现和np.mean()之间速度差异的最大因素实际上是将list转换为NumPy数组。
考虑以下计算平均值的方法:

  • 这本质上就是你认为的超快
def mean_naive(seq):
    return sum(seq) / len(seq)

字符串

  • 这是标准Python库中的数字安全实现
import statistics

def mean_st(seq):
    return statistics.mean(seq)

  • 下面使用NumPy mean()函数:
import numpy as np

def mean_np(seq):
    return np.mean(seq)

  • 这与朴素方法相同,但执行到NumPy数组的转换以分解出NumPy数组转换成本:
import numpy as np

def mean_naive_conv(seq):
    np.array(seq)  # the result of the conversion is not used!
    return sum(seq) / len(seq)

  • 这是一个Numba加速版本的naïve方法作用于NumPy数组。Numba加速本质上是通过llvm的即时编译将Python代码转换为 * 优化 * 的C++代码。如果sum() / len() * 比C* 快,那么mean_naive_conv()应该优于这个。
import numpy as np
import numba as nb

@nb.njit
def mean_naive_nb(seq):
    sum_ = 0
    for x in seq:
        sum_ += x
    return sum_ / len(seq)

def mean_naive_np_nb(seq):
    seq = np.array(seq)
    return mean_naive_nb(seq)


然而,当我们用下面的代码对这些进行基准测试时:

import random

funcs = (
    mean_naive, mean_st, mean_np, mean_naive_conv, mean_naive_np_nb, only_conv)

timings = {}
for k in range(1, 20):
    n = 2 ** k
    seq = tuple(random.random() for _ in range(n))
    print(f"n = {n}, k = {k}")
    timings[n] = []
    base = funcs[0](seq)
    for func in funcs:
        res = func(seq)  # this ensures that JIT-ted code is compiled before benchmarking
        is_good = np.allclose(base, res)
        timed = %timeit -r 4 -n 8 -q -o func(seq)
        timing = timed.best * 1e6
        timings[n].append(timing)
        print(f"{func.__name__:>24}  {is_good!s:>5}  {timing:10.3f} µs")


将与以下各项一起绘制:

import pandas as pd

df = pd.DataFrame(data=timings, index=[func.__name__ for func in funcs]).transpose()
df.plot(marker='o', xlabel='Input size / #', ylabel='Best timing / µs', ylim=[0, 40000])

fig = plt.gcf()
fig.patch.set_facecolor('white')


x1c 0d1x的数据
并与:

df.plot(marker='o', xlabel='Input size / #', ylabel='Best timing / µs', ylim=[0, 600], xlim=[0, 9000])

fig = plt.gcf()
fig.patch.set_facecolor('white')



(for在较小的输入尺寸上进行一些缩放)
我们可以观察到:

  • 基于statistics的方法是目前为止最慢的。
  • 到目前为止,朴素的方法是最快的
  • 当比较所有确实有从Python list到NumPy数组的类型转换的方法时:
  • np.mean()对于较大的输入大小是最快的,这可能是因为它使用特定的优化进行了编译(我推测是最佳地使用了SIMD指令);对于较小的输入,运行时间主要取决于支持所有ufunc功能的开销
  • 对于中等输入大小,Numba加速版本是最快的;对于非常小的输入,运行时间会因调用Numba函数的少量(大致恒定)开销而延长
  • 对于非常小的输入,sum() / len()最终成为最快的

这表明sum() / len()本质上比 * 优化 * 的C++代码在数组上的速度要慢。

v1uwarro

v1uwarro3#

我的代码在最后。

结论/对问题的回答

从下面的测试结果来看,Numpy方法(至少在计算平均值时)肯定比Python内置方法快,只要你使用足够大的列表或数组
然而,我测试的Python内置方法可以在非常短的(1D-)列表或(1D-)数组中优于Numpy方法。这里它发生在包含少于150个数字的列表/数组中:所以你必须意识到这一点!
@norok2的上述回答解释了这种行为。
毫不奇怪,您还可以看到,无论函数被调用多少次,每个方法之间的相对性能都保持一致。

方法

当我浏览关于Numpy的教程时,我读到随着大小的增加,Numpy数组操作可以比内置Python列表操作的相同操作快30倍。
所以,出于好奇,我做了一个脚本来测量2个函数的执行时间(通过装饰器):
1.用于计算(此处为1D-)数组平均值的Numpy方法:

def mean_array(array):
      return np.mean(array)

字符串
1.使用Python内置函数计算均值的方法:

def mean_list(liste):
     return sum(liste)/len(liste)

  • 随机性可靠性 *

使用random模块,我生成了maxnsize列表income。每个列表income[i]包含nsize[i]随机生成的1400到10000之间的数字(我以nsize[i]一年的财务操作收入为例)。
请注意,maxnsize是每个函数(时间)测试重复的次数。这给了我一个maxnsize以上函数执行时间测量的样本,所以我可以推断出统计平均值和标准差,以加强我的测试的可靠性。
为了进一步提高我的结论的鲁棒性,并强制进行压力测试,比较不同列表大小的函数(因此分散了时间执行尺度),我还将每个nsize[i]随机化为1和参数nmaxoperations之间的值。
因此,在我们继续测量函数性能之前:所有收入,包含它们的列表以及每个列表的大小都是使用numpy.randint随机生成的。
我测试了这些参数:

  1. maxnsize =1e31e41e5
    1.对于每个maxnsize,我测试了nmaxoperations =101e2,..,1e61e7
    我注意到x是Python方法与Numpy方法的时间比。

结果

  • maxnsize = 1000
  • nmaxoperations=
  • 10x=0.2723±0.0654
  • 1e2x=0.7056±0.3035
  • 1e3x=4.4767±2.6933
  • 1e4x=23.670±11.838
  • 1e5x=42.059±14.877
  • maxnsize = 10000
  • nmaxoperations=
  • 10x=0.2290±0.0854
  • 1e2x=0.7654±0.3593
  • 1e3x=4.6912±2.9002
  • 1e4x=23.052±11.884
  • 1e5x=44.127±14.886
  • maxnsize = 100000
  • nmaxoperations=
  • 10x=0.2854±0.1108
  • 1e2x=0.7566±0.3488
  • 1e3x=4.6768±3.0187
  • 1e4x=23.835±11.870
  • 1e5x=43.780±14.326
    验证码
import numpy as np
import time

def measure_time(fonction):

    def modified_function(element):

        time_start = time.time() 
        returned = fonction(element) 
        time_end    = time.time()
        time_execution = time_end - time_start
        return time_execution, returned
    
    return modified_function


@measure_time
def mean_array(array):
    return np.mean(array)

@measure_time
def mean_liste(liste):
    return sum(liste)/len(liste)

maxnsize = 100
nmaxoperations = 1000  
nsize = np.random.randint(1,high=nmaxoperations,size=maxnsize)
time_liste = []
time_array = []
ratio_performances = []
means_liste = []
means_array = []
for i in range(0,maxnsize):
    income = np.random.randint(1400,high=10000,size=nsize[i]) 
    perf_array = tuple(mean_array(income)) 
    perf_liste = tuple(mean_liste(income)) 
    time_liste.append(perf_liste[0])
    means_liste.append(perf_liste[1])
    time_array.append(perf_array[0])
    means_array.append(perf_array[1])
    ratio_performances.append(time_liste[i]/time_array[i]) 

time_mean_liste = np.mean(time_liste)
time_mean_array = np.mean(time_array)
time_std_liste = np.std(time_liste)
time_std_array = np.std(time_array)
mean_income_liste = np.mean(means_liste)
mean_income_array = np.mean(means_array)
mean_std_liste = np.std(means_liste)
mean_std_array = np.std(means_array)
ratio_performances_mean = np.mean(ratio_performances)
ratio_performances_std = np.std(ratio_performances)

print(u"With Python\'s list : mean = {}\u00B1{} computed within {}\u00B1{} s".format(mean_income_liste,
                                                                                    mean_std_liste,
                                                                                    time_mean_liste,
                                                                                    time_std_liste))

print(u"With Numpy\'s array : mean = {}\u00B1{} computed within {}\u00B1{} s".format(mean_income_array,
                                                                                    mean_std_array,
                                                                                    time_mean_array,
                                                                                    time_std_array))

print(u"Numy took {}\u00B1{} less time to compute mean thant with Python built-in methods".format(ratio_performances_mean,ratio_performances_std))

hmmo2u0o

hmmo2u0o4#

您正在为每个调用复制数组以求平均值,这需要大量额外的时间。

#include <numeric>
#include <iostream>
#include <vector>
#include <chrono>
#include <random>

//!! pass vector by reference to avoid copies!!!!
double calculate_average(const std::vector<int>& vec_of_num)
{
    return static_cast<double>(std::accumulate(vec_of_num.begin(), vec_of_num.end(), 0)) / static_cast<double>(vec_of_num.size());
}

int main()
{
    std::mt19937 generator(1); // static std::mt19937 generator(std::random_device{}());
    std::uniform_int_distribution<int> distribution{ 0,1000 };

    // This program will create same sequence of
    // random numbers on every program run
    std::vector<int> values(1000000);

    for (auto& value : values)
    {
        value = distribution(generator);
    }

    auto start = std::chrono::high_resolution_clock::now();
    double sum{ 0.0 };

    for (int i = 0; i < 500; i++)
    {
        // force compiler to use average so it can't be optimized away
        sum += calculate_average(values);
    }

    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> float_ms = end - start;

    // force compiler to use sum so it can't be optimized away
    std::cout << "sum = " << sum << "\n"; 
    std::cout << "calculate_average() elapsed time is " << float_ms.count() / 500 << " ms )" << std::endl;
    return 0;
}

字符串

相关问题