我正在使用optimize.minimize
从scipy
进行优化,假设目标函数为fun
。
我需要对我的框架的每一行进行优化,目前我正在使用joblib
中的Parallel
:
import pandas as pd
import time
from joblib import Parallel,delayed
import multiprocessing
from scipy.optimize import basinhopping
import numpy as np
import jax
df=pd.DataFrame()
rng = np.random.default_rng()
df[["p11", "p12", "p13"]]=rng.dirichlet(np.ones(3),size=1120)
df[["s1", "s2", "s3"]]=rng.dirichlet(np.ones(3),size=1120)
num_cores = multiprocessing.cpu_count()-1
def get_attraction(b1,b2, b3):
c1 = 20 * b1 + 12 * b2 + 6 * b3
c2 = 12 * b1 + 24 * b2 + 18 * b3
c3 = 0 * b1 + 14 * b2 + 30 * b3
return c1,c2,c3
def get_a_tau_max(p1_tau, p2_tau, p3_tau, a):
b1_tau_l1_a1_a1 = p1_tau * (1 - a) + a
b2_tau_l1_a1_a1 = p2_tau * (1 - a)
b3_tau_l1_a1_a1 = p3_tau * (1 - a)
a1_tau_l1_a1, a2_tau_l1_a1, a3_tau_l1_a1 = get_attraction(b1_tau_l1_a1_a1, b2_tau_l1_a1_a1,b3_tau_l1_a1_a1)
b1_tau_l1_a1_a2 = p1_tau * (1 - a)
b2_tau_l1_a1_a2 = p2_tau * (1 - a) + a
b3_tau_l1_a1_a2 = p3_tau * (1 - a)
a1_tau_l1_a2, a2_tau_l1_a2, a3_tau_l1_a2 = get_attraction(b1_tau_l1_a1_a2, b2_tau_l1_a1_a2,b3_tau_l1_a1_a2)
b1_tau_l1_a1_a3 = p1_tau * (1 - a)
b2_tau_l1_a1_a3 = p2_tau * (1 - a)
b3_tau_l1_a1_a3 = p3_tau * (1 - a) + a
a1_tau_l1_a3, a2_tau_l1_a3, a3_tau_l1_a3 = get_attraction(b1_tau_l1_a1_a3, b2_tau_l1_a1_a3,b3_tau_l1_a1_a3)
a_tau_max = max(a1_tau_l1_a1, a2_tau_l1_a2, a3_tau_l1_a3)
return a_tau_max
def get_signal_conditional_at(at, b1, b2, b3, a):
if at==1:
b1_tau = b1 * (1-a) + a
b2_tau = b2 * (1-a)
b3_tau = b3 * (1-a)
elif at==2:
b1_tau = b1 * (1-a)
b2_tau = b2 * (1-a) + a
b3_tau = b3 * (1-a)
else:
b1_tau = b1 * (1-a)
b2_tau = b2 * (1-a)
b3_tau = b3 * (1-a) + a
return b1_tau, b2_tau, b3_tau
def get_belief(index,row):
if index <= 1118:
df_mask = pd.DataFrame()
df_mask["weight"] = [0.8 ** i for i in range(0, index+1)][::-1]
dem = df_mask["weight"].sum()
subdf = df.iloc[:index].copy()
s1_history = subdf["s1"].values.tolist()
s2_history = subdf["s2"].values.tolist()
s3_history = subdf["s3"].values.tolist()
belief=[]
for at in [[row["b11"], row["b12"], row["b13"]],
[row["b21"], row["b22"], row["b23"]],
[row["b31"], row["b32"], row["b32"]]]:
df_mask["s1"] = s1_history + [at[0]]
df_mask["s2"] = s2_history + [at[1]]
df_mask["s3"] = s3_history + [at[2]]
nom1 = (df_mask["weight"] * df_mask["s1"]).sum()
nom2 = (df_mask["weight"] * df_mask["s2"]).sum()
nom3 = (df_mask["weight"] * df_mask["s3"]).sum()
b1 = nom1 / dem
b2 = nom2 / dem
b3 = nom3 / dem
belief += [b1,b2,b3]
return belief
else:
return [0,0,0,0,0,0,0,0,0]
def get_prob(x, index,row, lamda, a): #
p1, p2, p3 = x[0], x[1], x[2]
p11 = row["p11"]
p12 = row["p12"]
p13 = row["p13"]
b1 = p11 * (1-a) + p1 * a
b2 = p12 * (1-a) + p2 * a
b3 = p13 * (1-a) + p3 * a
c1,c2,c3=get_attraction(b1,b2,b3)
row["b11"], row["b12"], row["b13"] = get_signal_conditional_at(1, p11,p12,p13,a)
row["b21"], row["b22"], row["b23"] = get_signal_conditional_at(2, p11,p12,p13,a)
row["b31"], row["b32"], row["b33"] = get_signal_conditional_at(3, p11,p12,p13,a)
belief = get_belief(index,row)
t1 = get_a_tau_max(belief[0], belief[1], belief[2], a)
t2 = get_a_tau_max(belief[3], belief[4], belief[5], a)
t3 = get_a_tau_max(belief[6], belief[7], belief[8], a)
a1 = c1+t1
a2 = c2+t2
a3 = c3+t3
nom1 = np.exp(lamda*a1)
nom2 = np.exp(lamda*a2)
nom3 = np.exp(lamda*a3)
dem = nom1 + nom2 + nom3
p1_t = nom1 / dem
p2_t = nom2 / dem
p3_t = nom3 / dem
return (p1_t - p1) ** 2 + (p2_t - p2) ** 2 + (p3_t - p3) ** 2
def get_root(index,row,lamda,a):
fun = get_prob
#jac_ = jax.jacfwd(fun)
result = basinhopping(fun, x0=[0.0, 0.25, 0.75], niter=50, interval=10, seed=np.random.seed(0),
minimizer_kwargs={'args': (index,row,lamda,a), #'jac':jac_,
'method': "SLSQP",
'tol': 1.0e-4,
'bounds': [(0.0, 1.0), (0.0, 1.0), (0.0, 1.0)],
'constraints': {"type": 'eq',
'fun': lambda x: 1 - x[0] - x[1] - x[2],
'jac': lambda x: np.full_like(x,-1)}})
x=np.append(result.x, index)
return x
start = time.time()
p_sv = Parallel(n_jobs=num_cores)(delayed(get_root)(index=index, row=row, lamda=0.5,a=7/13)
for index, row in df.iterrows())
print("elapsed:", time.time() - start)
elapsed: 240.9707179069519
字符串
每次优化都是独立的,行与行之间没有信息交换。
完成整个数据集的优化需要很长时间,而且只占用了大约30%的CPU(我有M1 pro)。
我的问题是我应该设置多少个n_jobs
(或者有其他方法,如改变后端),使其使用100%的CPU,这样我就可以更快地完成这个程序?
我可以访问一个计算机集群,它有2个CPU,每个CPU有64个核心。我试过设置n_job=64
,它没有提供显着的改进(我还在学习如何利用2个CPU。)。
更新:现在我发现,提供目标函数的雅可比矩阵会减慢优化速度。对于80行,使用jac_
需要64秒,不使用它需要20秒(CPU几乎100%运行)。但为什么?我认为提供雅可比矩阵会使它更快。
更新:我添加了一个1120行的fun
和df
。如果我不使用jac_
,使用集群的速度大约快5倍。
1条答案
按热度按时间bttbmeg01#
这不是一个多处理问题,而是一个算法复杂度问题。你试图优化的函数
get_prob
依赖于get_belief
,get_belief
相对于index
是最坏情况下的线性复杂度。结果是你的程序是二次复杂度,这可能会大大降低速度。函数
get_belief
本身非常慢。问题是get_prob
(因此get_belief
)将在优化过程中被优化器调用多次,因此会减慢整个执行。如果您删除它或将其替换为常量值,(例如,用False
替换index <= 118
以缩短昂贵的代码路径)你会发现你的程序速度明显快了很多。在我的电脑上,这导致了20倍的速度。然而,我们可以注意到,
get_belief
并不依赖于优化函数fun
的变量x
,而仅仅依赖于行参数。这可能意味着你可以为 Dataframe 的每一行预先计算get_belief
,而不是在优化函数内部计算,从而大大提高了执行速度。