python-3.x 如何使用并行加速优化

luaexgnf  于 2023-11-20  发布在  Python
关注(0)|答案(1)|浏览(103)

我正在使用optimize.minimizescipy进行优化,假设目标函数为fun
我需要对我的框架的每一行进行优化,目前我正在使用joblib中的Parallel

import pandas as pd
import time
from joblib import Parallel,delayed
import multiprocessing
from scipy.optimize import basinhopping
import numpy as np
import jax

df=pd.DataFrame()
rng = np.random.default_rng()
df[["p11", "p12", "p13"]]=rng.dirichlet(np.ones(3),size=1120)
df[["s1", "s2", "s3"]]=rng.dirichlet(np.ones(3),size=1120)

num_cores = multiprocessing.cpu_count()-1

def get_attraction(b1,b2, b3):
    c1 = 20 * b1 + 12 * b2 + 6 * b3
    c2 = 12 * b1 + 24 * b2 + 18 * b3
    c3 = 0 * b1 + 14 * b2 + 30 * b3
    return c1,c2,c3

def get_a_tau_max(p1_tau, p2_tau, p3_tau, a):
    b1_tau_l1_a1_a1 = p1_tau * (1 - a) + a
    b2_tau_l1_a1_a1 = p2_tau * (1 - a)
    b3_tau_l1_a1_a1 = p3_tau * (1 - a)

    a1_tau_l1_a1, a2_tau_l1_a1, a3_tau_l1_a1 = get_attraction(b1_tau_l1_a1_a1, b2_tau_l1_a1_a1,b3_tau_l1_a1_a1)

    b1_tau_l1_a1_a2 = p1_tau * (1 - a)
    b2_tau_l1_a1_a2 = p2_tau * (1 - a) + a
    b3_tau_l1_a1_a2 = p3_tau * (1 - a)
    a1_tau_l1_a2, a2_tau_l1_a2, a3_tau_l1_a2 = get_attraction(b1_tau_l1_a1_a2, b2_tau_l1_a1_a2,b3_tau_l1_a1_a2)

    b1_tau_l1_a1_a3 = p1_tau * (1 - a)
    b2_tau_l1_a1_a3 = p2_tau * (1 - a)
    b3_tau_l1_a1_a3 = p3_tau * (1 - a) + a
    a1_tau_l1_a3, a2_tau_l1_a3, a3_tau_l1_a3 = get_attraction(b1_tau_l1_a1_a3, b2_tau_l1_a1_a3,b3_tau_l1_a1_a3)

    a_tau_max = max(a1_tau_l1_a1, a2_tau_l1_a2, a3_tau_l1_a3)

    return a_tau_max

def get_signal_conditional_at(at, b1, b2, b3, a):

    if at==1:
        b1_tau = b1 * (1-a) + a
        b2_tau = b2 * (1-a)
        b3_tau = b3 * (1-a)
    elif at==2:
        b1_tau = b1 * (1-a)
        b2_tau = b2 * (1-a) + a
        b3_tau = b3 * (1-a)
    else:
        b1_tau = b1 * (1-a)
        b2_tau = b2 * (1-a)
        b3_tau = b3 * (1-a) + a
    return b1_tau, b2_tau, b3_tau

def get_belief(index,row):
    if index <= 1118:
        df_mask = pd.DataFrame()
        df_mask["weight"] = [0.8 ** i for i in range(0, index+1)][::-1]
        dem = df_mask["weight"].sum()

        subdf = df.iloc[:index].copy()
        s1_history = subdf["s1"].values.tolist()
        s2_history = subdf["s2"].values.tolist()
        s3_history = subdf["s3"].values.tolist()

        belief=[]
        for at in [[row["b11"], row["b12"], row["b13"]],
                    [row["b21"], row["b22"], row["b23"]],
                    [row["b31"], row["b32"], row["b32"]]]:

            df_mask["s1"] = s1_history + [at[0]]
            df_mask["s2"] = s2_history + [at[1]]
            df_mask["s3"] = s3_history + [at[2]]

            nom1 = (df_mask["weight"] * df_mask["s1"]).sum()
            nom2 = (df_mask["weight"] * df_mask["s2"]).sum()
            nom3 = (df_mask["weight"] * df_mask["s3"]).sum()

            b1 = nom1 / dem
            b2 = nom2 / dem
            b3 = nom3 / dem
            belief += [b1,b2,b3]
        return belief
    else:
        return [0,0,0,0,0,0,0,0,0]

def get_prob(x, index,row, lamda, a): #
    p1, p2, p3 = x[0], x[1], x[2]

    p11 = row["p11"]
    p12 = row["p12"]
    p13 = row["p13"]

    b1 = p11 * (1-a) + p1 * a
    b2 = p12 * (1-a) + p2 * a
    b3 = p13 * (1-a) + p3 * a

    c1,c2,c3=get_attraction(b1,b2,b3)

    row["b11"], row["b12"], row["b13"] = get_signal_conditional_at(1, p11,p12,p13,a)
    row["b21"], row["b22"], row["b23"] = get_signal_conditional_at(2, p11,p12,p13,a)
    row["b31"], row["b32"], row["b33"] = get_signal_conditional_at(3, p11,p12,p13,a)

    belief = get_belief(index,row)

    t1 = get_a_tau_max(belief[0], belief[1], belief[2], a)
    t2 = get_a_tau_max(belief[3], belief[4], belief[5], a)
    t3 = get_a_tau_max(belief[6], belief[7], belief[8], a)

    a1 = c1+t1
    a2 = c2+t2
    a3 = c3+t3

    nom1 = np.exp(lamda*a1)
    nom2 = np.exp(lamda*a2)
    nom3 = np.exp(lamda*a3)
    dem = nom1 + nom2 + nom3

    p1_t = nom1 / dem
    p2_t = nom2 / dem
    p3_t = nom3 / dem

    return (p1_t - p1) ** 2 + (p2_t - p2) ** 2 + (p3_t - p3) ** 2

def get_root(index,row,lamda,a):
        fun = get_prob
        #jac_ = jax.jacfwd(fun)
        result = basinhopping(fun, x0=[0.0, 0.25, 0.75], niter=50, interval=10, seed=np.random.seed(0),
                              minimizer_kwargs={'args': (index,row,lamda,a), #'jac':jac_,
                                                'method': "SLSQP",
                                                'tol': 1.0e-4,
                                                'bounds': [(0.0, 1.0), (0.0, 1.0), (0.0, 1.0)],
                                                'constraints': {"type": 'eq',
                                                                'fun': lambda x: 1 - x[0] - x[1] - x[2],
                                                                'jac': lambda x: np.full_like(x,-1)}})
        x=np.append(result.x, index)
        return x

start = time.time()
p_sv = Parallel(n_jobs=num_cores)(delayed(get_root)(index=index, row=row, lamda=0.5,a=7/13)
                                  for index, row in df.iterrows())
print("elapsed:", time.time() - start)
elapsed: 240.9707179069519

字符串
每次优化都是独立的,行与行之间没有信息交换。
完成整个数据集的优化需要很长时间,而且只占用了大约30%的CPU(我有M1 pro)。
我的问题是我应该设置多少个n_jobs(或者有其他方法,如改变后端),使其使用100%的CPU,这样我就可以更快地完成这个程序?
我可以访问一个计算机集群,它有2个CPU,每个CPU有64个核心。我试过设置n_job=64,它没有提供显着的改进(我还在学习如何利用2个CPU。)。
更新:现在我发现,提供目标函数的雅可比矩阵会减慢优化速度。对于80行,使用jac_需要64秒,不使用它需要20秒(CPU几乎100%运行)。但为什么?我认为提供雅可比矩阵会使它更快。
更新:我添加了一个1120行的fundf。如果我不使用jac_,使用集群的速度大约快5倍。

bttbmeg0

bttbmeg01#

这不是一个多处理问题,而是一个算法复杂度问题。你试图优化的函数get_prob依赖于get_beliefget_belief相对于index是最坏情况下的线性复杂度。结果是你的程序是二次复杂度,这可能会大大降低速度。
函数get_belief本身非常慢。问题是get_prob(因此get_belief)将在优化过程中被优化器调用多次,因此会减慢整个执行。如果您删除它或将其替换为常量值,(例如,用False替换index <= 118以缩短昂贵的代码路径)你会发现你的程序速度明显快了很多。在我的电脑上,这导致了20倍的速度。
然而,我们可以注意到,get_belief并不依赖于优化函数fun的变量x,而仅仅依赖于行参数。这可能意味着你可以为 Dataframe 的每一行预先计算get_belief,而不是在优化函数内部计算,从而大大提高了执行速度。

相关问题