用numpy加速数据框lambda行的迭代?

0vvn1miw  于 2023-06-06  发布在  其他
关注(0)|答案(1)|浏览(143)

下面是我的代码,它从1,000,000个排列开始,然后消除没有意义的排列(例如,indicator_01大于6且小于4的行)。在这个例子中,我处理了166,375个排列。
代码遍历了166,375行中的每一行,然后执行groupby来对pnl(profit $)求和,然后生成一个显示最佳总体利润的输出。
此过程需要约30分钟。我从这里阅读许多问题中了解到,使用numpy可能会大大加快这一速度。我已经为此工作了几天,但毫无进展,所以我发布了我的原始代码(用lambda行迭代每个代码:寻求社区的帮助

###########################################################
### Backtest Optimizer - Brute Force - Three indicators ###
###########################################################

import pandas as pd
import itertools
import time
import numpy as np

pd.set_option('display.max_rows', 15)
tz = timezone('America/Denver')

x = 'QQQ'
trades_threshold = 1000 

print('Backtest Optimizer started at',datetime.now(tz).strftime('%H:%M:%S'))
checkpoint_01 = time.perf_counter()

filename = str('/content/'+x+'_summarized_trades.csv')
summarized_trades = pd.read_csv(filename)
print('Number of trades',len(summarized_trades))

##############################
list_of_indicators = ['rsi','std_pct','atr_5m/last_trade']
##############################

indicator_01 = list_of_indicators[0]
print('Indicator_01 is ',indicator_01)
indicator_02 = list_of_indicators[1]
print('Indicator_02 is ',indicator_02)
indicator_03 = list_of_indicators[2]
print('Indicator_03 is ',indicator_03)

summarized_trades['indicator_01_quantile'] = pd.qcut(summarized_trades[indicator_01].values, 10).codes
summarized_trades['indicator_02_quantile'] = pd.qcut(summarized_trades[indicator_02].values, 10).codes
summarized_trades['indicator_03_quantile'] = pd.qcut(summarized_trades[indicator_03].values, 10).codes
summarized_trades['indicator_01_quantile_min'] = pd.qcut(summarized_trades[indicator_01], 10).apply(lambda x: x.left)
summarized_trades['indicator_01_quantile_max'] = pd.qcut(summarized_trades[indicator_01], 10).apply(lambda x: x.right)
summarized_trades['indicator_02_quantile_min'] = pd.qcut(summarized_trades[indicator_02], 10).apply(lambda x: x.left)
summarized_trades['indicator_02_quantile_max'] = pd.qcut(summarized_trades[indicator_02], 10).apply(lambda x: x.right)
summarized_trades['indicator_03_quantile_min'] = pd.qcut(summarized_trades[indicator_03], 10).apply(lambda x: x.left)
summarized_trades['indicator_03_quantile_max'] = pd.qcut(summarized_trades[indicator_03], 10).apply(lambda x: x.right)

summarized_trades['number_of_trades'] = 1
summarized_trades['number_of_winners'] = summarized_trades.net_pnl.apply(lambda x: 1 if x >0 else 0)

### Group into similar quantile buckets
grouped_trades = summarized_trades.groupby(by=['indicator_01_quantile','indicator_02_quantile','indicator_03_quantile']).agg({'net_pnl':'sum','number_of_trades':'sum','number_of_winners':'sum','indicator_01_quantile_min':'min','indicator_01_quantile_max':'max','indicator_02_quantile_min':'min','indicator_02_quantile_max':'max','indicator_03_quantile_min':'min','indicator_03_quantile_max':'max'})
grouped_trades = grouped_trades.reset_index()  # Need to do this, otherwise unhashable
print('Length of grouped trades =',len(grouped_trades))
display(grouped_trades)


output = []
default_possibilities = [0,1,2,3,4,5,6,7,8,9] # Works for using 10 quantiles
possibilities = list(itertools.product(default_possibilities,default_possibilities,default_possibilities,default_possibilities,default_possibilities,default_possibilities))  # each of the three indicators can have 10x10 permutations
print('length of possibilities',len(possibilities))
possibilities = [item for item in possibilities if item[1]>=item[0] and item[3]>=item[2] and item[5]>=item[4]] #only keep combinations that make sense (e.g., tech indicator >2 and <4, but not >6 and <3)
print('length of revised possibilities',len(possibilities))

### iterate through the combinations that make sense
for i in possibilities:
  a = i[0]
  b = i[1]
  c = i[2]
  d = i[3]
  e = i[4]
  f = i[5]
  ### for each row, create new column to filter on, and if row is YES and has meets minimum trades_threshold, output to dataframe the results
  ### UTILIZE NUMPY INSTEAD OF LAMBDA ROW????
  grouped_trades['filter_01'] = grouped_trades.apply(lambda row: 'YES' if row['indicator_01_quantile'] >= a and row['indicator_01_quantile'] <= b and row['indicator_02_quantile'] >= c and row['indicator_02_quantile'] <= d and row['indicator_03_quantile'] >= e and row['indicator_03_quantile'] <= f else 'NO', axis=1)
  grouped_trades_filtered = grouped_trades.loc[(grouped_trades['filter_01']=='YES')]
  number_of_trades = grouped_trades_filtered.number_of_trades.sum()
  if number_of_trades < trades_threshold:
    continue
  else:
    number_of_winners = grouped_trades_filtered.number_of_winners.sum()
    net_pnl = grouped_trades_filtered.net_pnl.sum()
    output.append({'tech_indicator_01_min':a,'tech_indicator_01_max':b, 'tech_indicator_02_min':c,'tech_indicator_02_max':d, 'tech_indicator_03_min':e,'tech_indicator_03_max':f,'number_of_trades':number_of_trades,'number_of_winners':number_of_winners,'net_pnl':net_pnl})

output_df = pd.DataFrame(output)
print('Length of output',len(output_df))
output_df['win_rate'] = output_df.number_of_winners/output_df.number_of_trades
final_df = output_df.copy() ### if comparing with threshold above
final_df.sort_values(by='net_pnl', ascending=False, inplace=True)
display(final_df)
checkpoint_02 = time.perf_counter()
print('Total time',checkpoint_02-checkpoint_01)

# Take row 1 of the final_df, and take each value and use quantile
tech_indicator_01_min_quantile = final_df.iloc[0]['tech_indicator_01_min']
print(tech_indicator_01_min_quantile)
tech_indicator_01_max_quantile = final_df.iloc[0]['tech_indicator_01_max']
print(tech_indicator_01_max_quantile)
indicator_01_min_df = grouped_trades.loc[(grouped_trades.indicator_01_quantile == tech_indicator_01_min_quantile)]
tech_indicator_01_min_value = indicator_01_min_df.iloc[0]['indicator_01_quantile_min']
print(tech_indicator_01_min_value)
indicator_01_max_df = grouped_trades.loc[(grouped_trades.indicator_01_quantile == tech_indicator_01_max_quantile)]
tech_indicator_01_max_value = indicator_01_max_df.iloc[0]['indicator_01_quantile_max']
print(tech_indicator_01_max_value)

tech_indicator_02_min_quantile = final_df.iloc[0]['tech_indicator_02_min']
print(tech_indicator_02_min_quantile)
tech_indicator_02_max_quantile = final_df.iloc[0]['tech_indicator_02_max']
print(tech_indicator_02_max_quantile)
indicator_02_min_df = grouped_trades.loc[(grouped_trades.indicator_02_quantile == tech_indicator_02_min_quantile)]
tech_indicator_02_min_value = indicator_02_min_df.iloc[0]['indicator_02_quantile_min']
print(tech_indicator_02_min_value)
indicator_02_max_df = grouped_trades.loc[(grouped_trades.indicator_02_quantile == tech_indicator_02_max_quantile)]
tech_indicator_02_max_value = indicator_02_max_df.iloc[0]['indicator_02_quantile_max']
print(tech_indicator_02_max_value)

tech_indicator_03_min_quantile = final_df.iloc[0]['tech_indicator_03_min']
print(tech_indicator_03_min_quantile)
tech_indicator_03_max_quantile = final_df.iloc[0]['tech_indicator_03_max']
print(tech_indicator_03_max_quantile)
indicator_03_min_df = grouped_trades.loc[(grouped_trades.indicator_03_quantile == tech_indicator_03_min_quantile)]
tech_indicator_03_min_value = indicator_03_min_df.iloc[0]['indicator_03_quantile_min']
print(tech_indicator_03_min_value)
indicator_03_max_df = grouped_trades.loc[(grouped_trades.indicator_03_quantile == tech_indicator_03_max_quantile)]
tech_indicator_03_max_value = indicator_03_max_df.iloc[0]['indicator_03_quantile_max']
print(tech_indicator_03_max_value)

print(indicator_01,'>=',tech_indicator_01_min_value,'and',indicator_01,'<=',tech_indicator_01_max_value,'and',indicator_02,'>=',tech_indicator_02_min_value,'and',indicator_02,'<=',tech_indicator_02_max_value,'and',indicator_03,'>=',tech_indicator_03_min_value,'and',indicator_03,'<=',tech_indicator_03_max_value)
bxgwgixi

bxgwgixi1#

您可以使用numpy broadcasting(计算2个数组的外积)来对操作进行向量化。这是一种常见的模式,可以一次为所有行计算相同的操作(如果您有足够的内存...)。
你必须明白:

  • xxx[:, None]:添加新轴array([0, 1, 2])[:, None] => array([[0], [1], [2]])
  • np.sum(..., axis=n):计算沿着轴的和,而不是整个数组(降维)。

下面的代码可以以牺牲可读性为代价来减少。所以,用下面的代码替换你的循环:

# Convert to numpy
arr = np.array(possibilities)
lb, ub = arr[:, 0::2], arr[:, 1::2]

cols = ['indicator_01_quantile', 'indicator_02_quantile', 'indicator_03_quantile']
qtl = grouped_trades[cols].values

# Compute with number broadcasting
m1 = np.all(qtl >= lb[:, None], axis=2)  # lower bounds
m2 = np.all(qtl <= ub[:, None], axis=2)  # upper bounds
m3 = np.all(m1 & m2, axis=1)

number_of_trades = np.sum(m3[:, None] * grouped_trades['number_of_trades'].values, axis=1)
m4 = number_of_trades >= trades_threshold
number_of_winners = np.sum(m4[:, None] * grouped_trades['number_of_winners'].values, axis=1)
net_pnl = np.sum(m4[:, None] * grouped_trades['net_pnl'].values, axis=1)

# Create output dataframe
cols = ['tech_indicator_01_min', 'tech_indicator_01_max', 'tech_indicator_02_min',
        'tech_indicator_02_max', 'tech_indicator_03_min', 'tech_indicator_03_max']
df1 = pd.DataFrame(arr[m4], columns=cols)
df2 = pd.DataFrame({'number_of_trades': number_of_trades[m4],
                    'number_of_winners': number_of_winners[m4],
                    'net_pnl': net_pnl[m4]})
output_df = pd.concat([df1, df2], axis=1)

相关问题