我最近问了一个问题:NumPy convolve method has slight variance between equivalent for loop method for Volume Weighted Average Price
尝试使用np.convolve
比标准for循环计算滚动VWAP指标要快得多,但提供了不正确的计算,因为它遗漏了数组中的最后一项。
有没有一种方法可以在不使用for循环的情况下进行滚动加权求和?
□ □我尝试过的:
使用标准for
循环(慢速)
def calc_vwap_1(price, volume, period_lookback):
"""
Calculates the volume-weighted average price (VWAP) for a given period of time.
The VWAP is calculated by taking the sum of the product of each price and volume over a given period,
and dividing by the sum of the volume over that period.
Parameters:
price (numpy.ndarray): A list or array of prices.
volume (numpy.ndarray): A list or array of volumes, corresponding to the prices.
period_lookback (int): The number of days to look back when calculating VWAP.
Returns:
numpy.ndarray: An array of VWAP values, one for each day in the input period.
"""
vwap = np.zeros(len(price))
for i in range(period_lookback, len(price)):
lb = i - period_lookback # lower bound
ub = i + 1 # upper bound
volume_sum = volume[lb:ub].sum()
if volume_sum > 0:
vwap[i] = (price[lb:ub] * volume[lb:ub]).sum() / volume_sum
else:
vwap[i] = np.nan
return vwap
在same
模式下使用np.convolve
def calc_vwap_2(price, volume, period_lookback):
price_volume = price * volume
# Use convolve to get the rolling sum of product of price and volume
price_volume_conv = np.convolve(price_volume, np.ones(period_lookback), mode='same')[period_lookback-1:]
# Use convolve to get the rolling sum of volume
volume_conv = np.convolve(volume, np.ones(period_lookback), mode='same')[period_lookback-1:]
# Create a mask to check if the volume sum is greater than 0
mask = volume_conv > 0
# Initialize the vwap array
vwap = np.zeros(len(price))
# Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
vwap[period_lookback-1:] = np.where(mask, price_volume_conv / volume_conv, np.nan)
return vwap
在valid
模式下使用np.convolve
def calc_vwap_3(price, volume, period_lookback):
# Calculate product of price and volume
price_volume = price * volume
# Use convolve to get the rolling sum of product of price and volume and volume array
price_volume_conv = np.convolve(price_volume, np.ones(period_lookback), mode='valid')
# Use convolve to get the rolling sum of volume
volume_conv = np.convolve(volume, np.ones(period_lookback), mode='valid')
# Create a mask to check if the volume sum is greater than 0
mask = volume_conv > 0
# Initialize the vwap array
vwap = np.zeros(len(price))
# Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
vwap[period_lookback-1:] = np.where(mask, price_volume_conv / volume_conv, np.nan)
return vwap
使用np.cumsum
(抱歉,妈妈)进行切片
def calc_vwap_4(price, volume, period_lookback):
price_volume = price * volume
# Use cumsum to get the rolling sum of product of price and volume
price_volume_cumsum = np.cumsum(price_volume)[period_lookback-1:]
# Use cumsum to get the rolling sum of volume
volume_cumsum = np.cumsum(volume)[period_lookback-1:]
# Create a mask to check if the volume sum is greater than 0
mask = volume_cumsum > 0
# Initialize the vwap array
vwap = np.zeros(len(price))
# Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
vwap[period_lookback-1:] = np.where(mask, price_volume_cumsum / volume_cumsum, np.nan)
return vwap
使用np.reduceat
def calc_vwap_5(price, volume, period_lookback):
price_volume = price * volume
# Use reduceat to get the rolling sum of product of price and volume
price_volume_cumsum = np.add.reduceat(price_volume, np.arange(0, len(price), period_lookback))[period_lookback-1:]
# Use reduceat to get the rolling sum of volume
volume_cumsum = np.add.reduceat(volume, np.arange(0, len(price), period_lookback))[period_lookback-1:]
# Create a mask to check if the volume sum is greater than 0
mask = volume_cumsum > 0
# Initialize the vwap array
vwap = np.zeros(len(price))
# Use the mask to check if volume sum is greater than zero, if it is, proceed with the division and store the result in vwap array, otherwise store NaN
vwap[period_lookback-1:] = np.where(mask, price_volume_cumsum / volume_cumsum, np.nan)
return vwap
使用np.lib.stride_tricks.as_strided
def calc_vwap_6(price, volume, period_lookback):
price_volume = price * volume
price_volume_strided = np.lib.stride_tricks.as_strided(price_volume, shape=(len(price)-period_lookback+1, period_lookback), strides=(price_volume.strides[0], price_volume.strides[0]))
volume_strided = np.lib.stride_tricks.as_strided(volume, shape=(len(price)-period_lookback+1, period_lookback), strides=(volume.strides[0], volume.strides[0]))
price_volume_sum = price_volume_strided.sum(axis=1)
volume_sum = volume_strided.sum(axis=1)
mask = volume_sum > 0
vwap = np.zeros(len(price))
vwap[period_lookback-1:] = np.where(mask, price_volume_sum / volume_sum, np.nan)
return vwap
测试数据
import numpy as np
price = np.random.random(10000)
volume = np.random.random(10000)
print(calc_vwap(price, volume, 100))
print()
print(calc_vwap_1(price, volume, 100))
print()
print(calc_vwap_2(price, volume, 100))
print()
print(calc_vwap_3(price, volume, 100))
print()
print(calc_vwap_4(price, volume, 100))
print()
print(calc_vwap_5(price, volume, 100))
print()
print(calc_vwap_6(price, volume, 100))
print()
结果
vwap_1 -> [0. 0. 0. ... 0.47375965 0.47762679 0.48448903] # CORRECT CALCULATION
vwap_2 -> [0. 0. 0. ... 0.53108759 0.51933363 0.51360848]
vwap_3 -> [0. 0. 0. ... 0.49834202 0.4984141 0.49845759]
vwap_4 -> [0. 0. 0. ... 0.49834202 0.4984141 0.49845759]
vwap_5 -> [0. 0. 0. ... 0.48040529 0.48040529 0.48040529]
vwap_6 -> [0. 0. 0. ... 0.47027032 0.48009596 0.48040529]
1条答案
按热度按时间tkclm6bt1#
是的,可以按以下步骤进行:
在相同模式下使用np.卷积
在有效模式下使用np.convolve
使用非功能还原
使用np.库.跨距_tricks.as_跨距
测试数据
结果
在每个函数的结尾,我提到了我在10000个随机数上获得的时序细节,根据OP,周期回顾为100。