在python/numpy中不使用for循环计算滚动加权和

k10s72fa  于 2023-01-24  发布在  Python
关注(0)|答案(1)|浏览(93)

我最近问了一个问题:NumPy convolve method has slight variance between equivalent for loop method for Volume Weighted Average Price
尝试使用np.convolve比标准for循环计算滚动VWAP指标要快得多,但提供了不正确的计算,因为它遗漏了数组中的最后一项。
有没有一种方法可以在不使用for循环的情况下进行滚动加权求和?
□ □我尝试过的:

使用标准for循环(慢速)

def calc_vwap_1(price, volume, period_lookback):
    """
    Calculates the volume-weighted average price (VWAP) for a given period of time.
    The VWAP is calculated by taking the sum of the product of each price and volume over a given period, 
    and dividing by the sum of the volume over that period.
    
    Parameters:
        price (numpy.ndarray): A list or array of prices.
        volume (numpy.ndarray): A list or array of volumes, corresponding to the prices.
        period_lookback (int): The number of days to look back when calculating VWAP.
        
    Returns:
        numpy.ndarray: An array of VWAP values, one for each day in the input period.
    """
    vwap = np.zeros(len(price))
    for i in range(period_lookback, len(price)):
        lb = i - period_lookback  # lower bound
        ub = i + 1  # upper bound
        volume_sum = volume[lb:ub].sum()
        if volume_sum > 0:
            vwap[i] = (price[lb:ub] * volume[lb:ub]).sum() / volume_sum
        else:
            vwap[i] = np.nan
    return vwap

same模式下使用np.convolve

def calc_vwap_2(price, volume, period_lookback):
    price_volume = price * volume
    # Use convolve to get the rolling sum of product of price and volume
    price_volume_conv = np.convolve(price_volume, np.ones(period_lookback), mode='same')[period_lookback-1:]
    # Use convolve to get the rolling sum of volume
    volume_conv = np.convolve(volume, np.ones(period_lookback), mode='same')[period_lookback-1:]
    # Create a mask to check if the volume sum is greater than 0
    mask = volume_conv > 0
    # Initialize the vwap array
    vwap = np.zeros(len(price))
    # Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
    vwap[period_lookback-1:] = np.where(mask, price_volume_conv / volume_conv, np.nan)
    return vwap

valid模式下使用np.convolve

def calc_vwap_3(price, volume, period_lookback):
    # Calculate product of price and volume
    price_volume = price * volume
    # Use convolve to get the rolling sum of product of price and volume and volume array
    price_volume_conv = np.convolve(price_volume, np.ones(period_lookback), mode='valid')
    # Use convolve to get the rolling sum of volume
    volume_conv = np.convolve(volume, np.ones(period_lookback), mode='valid')
    # Create a mask to check if the volume sum is greater than 0
    mask = volume_conv > 0
    # Initialize the vwap array
    vwap = np.zeros(len(price))
    # Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
    vwap[period_lookback-1:] = np.where(mask, price_volume_conv / volume_conv, np.nan)
    return vwap

使用np.cumsum(抱歉,妈妈)进行切片

def calc_vwap_4(price, volume, period_lookback):
    price_volume = price * volume
    # Use cumsum to get the rolling sum of product of price and volume
    price_volume_cumsum = np.cumsum(price_volume)[period_lookback-1:]
    # Use cumsum to get the rolling sum of volume
    volume_cumsum = np.cumsum(volume)[period_lookback-1:]
    # Create a mask to check if the volume sum is greater than 0
    mask = volume_cumsum > 0
    # Initialize the vwap array
    vwap = np.zeros(len(price))
    # Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
    vwap[period_lookback-1:] = np.where(mask, price_volume_cumsum / volume_cumsum, np.nan)
    return vwap

使用np.reduceat

def calc_vwap_5(price, volume, period_lookback):
    price_volume = price * volume
    # Use reduceat to get the rolling sum of product of price and volume
    price_volume_cumsum = np.add.reduceat(price_volume, np.arange(0, len(price), period_lookback))[period_lookback-1:]
    # Use reduceat to get the rolling sum of volume
    volume_cumsum = np.add.reduceat(volume, np.arange(0, len(price), period_lookback))[period_lookback-1:]
    # Create a mask to check if the volume sum is greater than 0
    mask = volume_cumsum > 0
    # Initialize the vwap array
    vwap = np.zeros(len(price))
    # Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
    vwap[period_lookback-1:] = np.where(mask, price_volume_cumsum / volume_cumsum, np.nan)
    return vwap

使用np.lib.stride_tricks.as_strided

def calc_vwap_6(price, volume, period_lookback):
    price_volume = price * volume
    price_volume_strided = np.lib.stride_tricks.as_strided(price_volume, shape=(len(price)-period_lookback+1, period_lookback), strides=(price_volume.strides[0], price_volume.strides[0]))
    volume_strided = np.lib.stride_tricks.as_strided(volume, shape=(len(price)-period_lookback+1, period_lookback), strides=(volume.strides[0], volume.strides[0]))
    price_volume_sum = price_volume_strided.sum(axis=1)
    volume_sum = volume_strided.sum(axis=1)
    mask = volume_sum > 0
    vwap = np.zeros(len(price))
    vwap[period_lookback-1:] = np.where(mask, price_volume_sum / volume_sum, np.nan)
    return vwap

测试数据

import numpy as np

price = np.random.random(10000)
volume = np.random.random(10000)
print(calc_vwap(price, volume, 100))
print()
print(calc_vwap_1(price, volume, 100))
print()
print(calc_vwap_2(price, volume, 100))
print()
print(calc_vwap_3(price, volume, 100))
print()
print(calc_vwap_4(price, volume, 100))
print()
print(calc_vwap_5(price, volume, 100))
print()
print(calc_vwap_6(price, volume, 100))
print()

结果

vwap_1 -> [0.         0.         0.         ... 0.47375965 0.47762679 0.48448903] # CORRECT CALCULATION

vwap_2 -> [0.         0.         0.         ... 0.53108759 0.51933363 0.51360848]

vwap_3 -> [0.         0.         0.         ... 0.49834202 0.4984141  0.49845759]

vwap_4 -> [0.         0.         0.         ... 0.49834202 0.4984141  0.49845759]

vwap_5 -> [0.         0.         0.         ... 0.48040529 0.48040529 0.48040529]

vwap_6 -> [0.         0.         0.         ... 0.47027032 0.48009596 0.48040529]
tkclm6bt

tkclm6bt1#

是的,可以按以下步骤进行:

在相同模式下使用np.卷积

def calc_vwap_2(price, volume, period_lookback):
   price_volume = price * volume
   # Use convolve to get the rolling sum of product of price and volume
   if period_lookback%2 == 0:
       st = period_lookback//2
       en = -period_lookback//2
   else:
       st = period_lookback//2 + 1
       en = -period_lookback//2 + 1
   price_volume_conv = np.convolve(price_volume, np.ones(period_lookback+1), mode='same')[st:en]
   # Use convolve to get the rolling sum of volume
   volume_conv = np.convolve(volume, np.ones(period_lookback+1), mode='same')[st:en]
   # Create a mask to check if the volume sum is greater than 0
   mask = volume_conv > 0
   # Initialize the vwap array
   vwap = np.zeros(len(price))
   # Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
   vwap[period_lookback:] = np.where(mask, price_volume_conv / volume_conv, np.nan)
   return vwap

 # 1.11 ms ± 26.7 µs per loop (mean ± std. dev. of 3 runs, 100 loops each)

在有效模式下使用np.convolve

def calc_vwap_3(price, volume, period_lookback):
    # Calculate product of price and volume
    price_volume = price * volume
    # Use convolve to get the rolling sum of product of price and volume and volume array
    price_volume_conv = np.convolve(price_volume, np.ones(period_lookback+1), mode='valid')
    # Use convolve to get the rolling sum of volume
    volume_conv = np.convolve(volume, np.ones(period_lookback+1), mode='valid')
    # Create a mask to check if the volume sum is greater than 0
    mask = volume_conv > 0
    # Initialize the vwap array
    vwap = np.zeros(len(price))
    # Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
    vwap[period_lookback:] = np.where(mask, price_volume_conv/volume_conv, np.nan)
    return vwap

# 1.22 ms ± 17.5 µs per loop (mean ± std. dev. of 3 runs, 100 loops each)

使用非功能还原

def calc_vwap_5(price, volume, period_lookback):
    price_volume = price * volume
    # Use reduceat to get the rolling sum of product of price and volume
    
    indices = np.arange(0,len(price)+1)
    isz = indices.itemsize
    indx = np.lib.stride_tricks.as_strided(indices, shape=(len(price)-period_lookback, 2), strides=(isz, (period_lookback+1)*isz))
    price_volume_cumsum = np.add.reduceat(price_volume, indx.ravel()[:-1])[::2]
    # Use reduceat to get the rolling sum of volume
    volume_cumsum = np.add.reduceat(volume, indx.ravel()[:-1])[::2]
    # Create a mask to check if the volume sum is greater than 0
    mask = volume_cumsum > 0
    # Initialize the vwap array
    vwap = np.zeros(len(price))
    # Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
    vwap[period_lookback:] = np.where(mask, price_volume_cumsum / volume_cumsum, np.nan)
    return vwap
# 1.64 ms ± 36.5 µs per loop (mean ± std. dev. of 3 runs, 100 loops each)

使用np.库.跨距_tricks.as_跨距

def calc_vwap_6(price, volume, period_lookback):
    price_volume = price * volume
    s = price_volume.itemsize
    price_volume_strided = np.lib.stride_tricks.as_strided(price_volume, shape=(len(price)-period_lookback, period_lookback+1), strides=(s, s))
    vs = volume.itemsize
    volume_strided = np.lib.stride_tricks.as_strided(volume, shape=(len(price)-period_lookback, period_lookback+1), strides=(vs, vs))
    price_volume_sum = price_volume_strided.sum(axis=1)
    volume_sum = volume_strided.sum(axis=1)
    mask = volume_sum > 0
    vwap = np.zeros(len(price))
    vwap[period_lookback:] = np.where(mask, price_volume_sum / volume_sum, np.nan)
    return vwap

# 1.09 ms ± 46.1 µs per loop (mean ± std. dev. of 3 runs, 100 loops each)

测试数据

price = np.random.random(10)
volume = np.random.random(10)

print(calc_vwap_1(price, volume, 2))
print(calc_vwap_2(price, volume, 2))
print(calc_vwap_3(price, volume, 2))
# print(calc_vwap_4(price, volume, 2))
print(calc_vwap_5(price, volume, 2))
print(calc_vwap_6(price, volume, 2))

结果

vwap_1 ->[0.         0.         0.42407022 0.30592959 0.40199312 0.62537516
 0.65380957 0.63246025 0.44962312 0.57632784]
vwap_2 ->[0.         0.         0.42407022 0.30592959 0.40199312 0.62537516
 0.65380957 0.63246025 0.44962312 0.57632784]
vwap_3 ->[0.         0.         0.42407022 0.30592959 0.40199312 0.62537516
 0.65380957 0.63246025 0.44962312 0.57632784]
vwap_5 ->[0.         0.         0.42407022 0.30592959 0.40199312 0.62537516
 0.65380957 0.63246025 0.44962312 0.57632784]
vwap_6 ->[0.         0.         0.42407022 0.30592959 0.40199312 0.62537516
 0.65380957 0.63246025 0.44962312 0.57632784]

在每个函数的结尾,我提到了我在10000个随机数上获得的时序细节,根据OP,周期回顾为100。

相关问题