找到变量的值以最大化 Python 中函数的 return

Find the value of variables to maximize return of function in Python

我想获得与 Excel 中求解器函数的工作方式类似的结果。我一直在阅读 Scipy 优化,并一直在尝试构建一个函数来输出我想找到的最大值。该等式基于四个不同的变量,请参阅下面的代码:

import pandas as pd
import numpy as np
from scipy import optimize

cols = {
    'Dividend2': [9390, 7448, 177], 
    'Probability': [341, 376, 452], 
    'EV': [0.53, 0.60, 0.55], 
    'Dividend': [185, 55, 755], 
    'EV2': [123, 139, 544],
}

df = pd.DataFrame(cols)

def myFunc(params):
    """myFunc metric."""
    (ev, bv, vc, dv) = params
    df['Number'] = np.where(df['Dividend2'] <= vc, 1, 0) \
                    + np.where(df['EV2'] <= dv, 1, 0)
    df['Return'] =  np.where(
        df['EV'] <= ev, 0, np.where(
            df['Probability'] >= bv, 0, df['Number'] * df['Dividend'] - (vc + dv)
        )
    )
    return -1 * (df['Return'].sum())

b1 = [(0.2,4), (300,600), (0,1000), (0,1000)]
start = [0.2, 600, 1000, 1000]
result = optimize.minimize(fun=myFunc, bounds=b1, x0=start)
print(result)

所以我想在更改变量ev,bv,vc & dv 时,在df 中找到列Return 的最大值。我希望它们介于 ev: 0.2-4, bv: 300-600, vc: 0-1000 & dv: 0-1000.

当 运行 我的代码时,函数似乎停止在 x0。

解决方案

我将使用 optuna 库为您提供针对您要解决的问题类型的解决方案。我试过使用 scipy.optimize.minimize,看起来 loss-landscape 在大多数地方可能很平坦,因此公差强制最小化算法 (L-BFGS-B) 过早停止。

使用 optuna,它相当简单。 Optuna 只需要一个 objective 函数和一个 study。该研究将各种 trials 发送到 objective 函数,该函数反过来评估您选择的指标。

我定义了另一个度量函数 myFunc2,主要是删除 np.where 调用,因为您可以 do-away 使用它们(减少步骤数)并使函数稍微更快。

# install optuna with pip
pip install -Uqq optuna

虽然我研究过使用相当平滑的损失景观,但有时有必要将景观本身可视化。 B 部分的答案详细阐述了可视化。但是,如果您想使用更平滑的度量函数怎么办? D 部分对此进行了说明。

code-execution的顺序应该是:

  • 部分:C >> B >> B.1 >> B.2 >> B.3 >> A.1 >> A.2 >> D

一个。建立直觉

如果您使用 search_space 部分 B.2 中提到的所有可能参数值创建一个 hiplot(也称为带有 parallel-coordinates 的绘图),并绘制最低的myFunc2 的 50 个输出,它看起来像这样:

search_space 中绘制所有这些点将如下所示:

A.1。各种损失景观视图 Parameter-Pairs

这些数字表明,对于四个参数 (ev, bv, vc, dv) 中的任意两个,loss-landscape 大多是持平的。这可能是为什么与其他两个采样器(TPESamplerRandomSampler)。请单击下面的任何图片以放大查看。这也可能是 scipy.optimize.minimize(method="L-BFGS-B") 立即失败的原因。



01. dv-vc


02. dv-bv


03. dv-ev


04. bv-ev


05. cv-ev


06. vc-bv
# Create contour plots for parameter-pairs
study_name = "GridSampler"
study = studies.get(study_name)

views = [("dv", "vc"), ("dv", "bv"), ("dv", "ev"), 
         ("bv", "ev"), ("vc", "ev"), ("vc", "bv")]

for i, (x, y) in enumerate(views):
    print(f"Figure: {i}/{len(views)}")
    study_contour_plot(study=study, params=(x, y))

A.2。参数重要性

study_name = "GridSampler"
study = studies.get(study_name)

fig = optuna.visualization.plot_param_importances(study)
fig.update_layout(title=f'Hyperparameter Importances: {study.study_name}', 
                  autosize=False,
                  width=800, height=500,
                  margin=dict(l=65, r=50, b=65, t=90))
fig.show()

乙。代码

部分 B.3. 找到最低指标 -88.333

  • {'ev': 0.2, 'bv': 500.0, 'vc': 222.2222, 'dv': 0.0}
import warnings
from functools import partial
from typing import Iterable, Optional, Callable, List

import pandas as pd
import numpy as np
import optuna
from tqdm.notebook import tqdm

warnings.filterwarnings("ignore", category=optuna.exceptions.ExperimentalWarning)
optuna.logging.set_verbosity(optuna.logging.WARNING)

PARAM_NAMES: List[str] = ["ev", "bv", "vc", "dv",]
DEFAULT_METRIC_FUNC: Callable = myFunc2


def myFunc2(params):
    """myFunc metric v2 with lesser steps."""
    global df # define as a global variable
    (ev, bv, vc, dv) = params
    df['Number'] = (df['Dividend2'] <= vc) * 1 + (df['EV2'] <= dv) * 1
    df['Return'] =  (
        (df['EV'] > ev) 
        * (df['Probability'] < bv) 
        * (df['Number'] * df['Dividend'] - (vc + dv))
    )
    return -1 * (df['Return'].sum())


def make_param_grid(
        bounds: List[Tuple[float, float]], 
        param_names: Optional[List[str]]=None, 
        num_points: int=10, 
        as_dict: bool=True,
    ) -> Union[pd.DataFrame, Dict[str, List[float]]]:
    """
    Create parameter search space.

    Example:
    
        grid = make_param_grid(bounds=b1, num_points=10, as_dict=True)
    
    """
    if param_names is None:
        param_names = PARAM_NAMES # ["ev", "bv", "vc", "dv"]
    bounds = np.array(bounds)
    grid = np.linspace(start=bounds[:,0], 
                       stop=bounds[:,1], 
                       num=num_points, 
                       endpoint=True, 
                       axis=0)
    grid = pd.DataFrame(grid, columns=param_names)
    if as_dict:
        grid = grid.to_dict()
        for k,v in grid.items():
            grid.update({k: list(v.values())})
    return grid


def objective(trial, 
              bounds: Optional[Iterable]=None, 
              func: Optional[Callable]=None, 
              param_names: Optional[List[str]]=None):
    """Objective function, necessary for optimizing with optuna."""
    if param_names is None:
        param_names = PARAM_NAMES
    if (bounds is None):
        bounds = ((-10, 10) for _ in param_names)
    if not isinstance(bounds, dict):
        bounds = dict((p, (min(b), max(b))) 
                        for p, b in zip(param_names, bounds))
    if func is None:
        func = DEFAULT_METRIC_FUNC

    params = dict(
        (p, trial.suggest_float(p, bounds.get(p)[0], bounds.get(p)[1])) 
        for p in param_names        
    )
    # x = trial.suggest_float('x', -10, 10)
    return func((params[p] for p in param_names))


def optimize(objective: Callable, 
             sampler: Optional[optuna.samplers.BaseSampler]=None, 
             func: Optional[Callable]=None, 
             n_trials: int=2, 
             study_direction: str="minimize",
             study_name: Optional[str]=None,
             formatstr: str=".4f",
             verbose: bool=True):
    """Optimizing function using optuna: creates a study."""
    if func is None:
        func = DEFAULT_METRIC_FUNC
    study = optuna.create_study(
        direction=study_direction, 
        sampler=sampler, 
        study_name=study_name)
    study.optimize(
        objective, 
        n_trials=n_trials, 
        show_progress_bar=True, 
        n_jobs=1,
    )
    if verbose:
        metric = eval_metric(study.best_params, func=myFunc2)
        msg = format_result(study.best_params, metric, 
                            header=study.study_name, 
                            format=formatstr)
        print(msg)
    return study


def format_dict(d: Dict[str, float], format: str=".4f") -> Dict[str, float]:
    """
    Returns formatted output for a dictionary with 
    string keys and float values.
    """
    return dict((k, float(f'{v:{format}}')) for k,v in d.items())


def format_result(d: Dict[str, float], 
                  metric_value: float, 
                  header: str='', 
                  format: str=".4f"): 
    """Returns formatted result."""
    msg = f"""Study Name: {header}\n{'='*30}
    
    ✅ study.best_params: \n\t{format_dict(d)}
    ✅ metric: {metric_value} 
    """
    return msg


def study_contour_plot(study: optuna.Study, 
                       params: Optional[List[str]]=None, 
                       width: int=560, 
                       height: int=500):
    """
    Create contour plots for a study, given a list or 
    tuple of two parameter names.
    """
    if params is None:
        params = ["dv", "vc"]
    fig = optuna.visualization.plot_contour(study, params=params)
    fig.update_layout(
        title=f'Contour Plot: {study.study_name} ({params[0]}, {params[1]})', 
        autosize=False,
        width=width, 
        height=height,
        margin=dict(l=65, r=50, b=65, t=90))
    fig.show()


bounds = [(0.2, 4), (300, 600), (0, 1000), (0, 1000)]
param_names = PARAM_NAMES # ["ev", "bv", "vc", "dv",]
pobjective = partial(objective, bounds=bounds)

# Create an empty dict to contain 
# various subsequent studies.
studies = dict()

Optuna 附带了几种不同类型的采样器。采样器提供了 optuna 如何从 parametr-space 采样点并评估 objective 函数的策略。

B.1 使用TPESampler

from optuna.samplers import TPESampler

sampler = TPESampler(seed=42)

study_name = "TPESampler"
studies[study_name] = optimize(
    pobjective, 
    sampler=sampler, 
    n_trials=100, 
    study_name=study_name,
)

# Study Name: TPESampler
# ==============================
#    
#     ✅ study.best_params: 
#   {'ev': 1.6233, 'bv': 585.2143, 'vc': 731.9939, 'dv': 598.6585}
#     ✅ metric: -0.0 

B.2。使用 GridSampler

GridSampler 需要参数搜索网格。这里我们使用下面的 search_space.

from optuna.samplers import GridSampler

# create search-space
search_space = make_param_grid(bounds=bounds, num_points=10, as_dict=True)

sampler = GridSampler(search_space)

study_name = "GridSampler"
studies[study_name] = optimize(
    pobjective, 
    sampler=sampler, 
    n_trials=2000, 
    study_name=study_name,
)

# Study Name: GridSampler
# ==============================
#    
#     ✅ study.best_params: 
#   {'ev': 0.2, 'bv': 500.0, 'vc': 222.2222, 'dv': 0.0}
#     ✅ metric: -88.33333333333337 

B.3。使用 RandomSampler

from optuna.samplers import RandomSampler

sampler = RandomSampler(seed=42)

study_name = "RandomSampler"
studies[study_name] = optimize(
    pobjective, 
    sampler=sampler, 
    n_trials=300, 
    study_name=study_name,
)

# Study Name: RandomSampler
# ==============================
#    
#     ✅ study.best_params: 
#   {'ev': 1.6233, 'bv': 585.2143, 'vc': 731.9939, 'dv': 598.6585}
#     ✅ metric: -0.0 

C。虚拟数据

为了可重现性,我保留了这里使用的虚拟数据的记录。

import pandas as pd
import numpy as np
from scipy import optimize

cols = {
    'Dividend2': [9390, 7448, 177], 
    'Probability': [341, 376, 452], 
    'EV': [0.53, 0.60, 0.55], 
    'Dividend': [185, 55, 755], 
    'EV2': [123, 139, 544],
}

df = pd.DataFrame(cols)

def myFunc(params):
    """myFunc metric."""
    (ev, bv, vc, dv) = params
    df['Number'] = np.where(df['Dividend2'] <= vc, 1, 0) \
                    + np.where(df['EV2'] <= dv, 1, 0)
    df['Return'] =  np.where(
        df['EV'] <= ev, 0, np.where(
            df['Probability'] >= bv, 0, df['Number'] * df['Dividend'] - (vc + dv)
        )
    )
    return -1 * (df['Return'].sum())

b1 = [(0.2,4), (300,600), (0,1000), (0,1000)]
start = [0.2, 600, 1000, 1000]
result = optimize.minimize(fun=myFunc, bounds=b1, x0=start)
print(result)

C.1。一个观察

因此,乍一看似乎代码执行正确并且没有抛出任何错误。它说它已成功找到最小化的解决方案。

      fun: -0.0
 hess_inv: <4x4 LbfgsInvHessProduct with dtype=float64>
      jac: array([0., 0., 3., 3.])
  message: b'CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL' # 
     nfev: 35
      nit: 2
   status: 0
  success: True
        x: array([2.e-01, 6.e+02, 0.e+00, 0.e+00]) # 

仔细观察发现解决方案(参见 )与起点 [0.2, 600, 1000, 1000] 没有什么不同。所以,似乎什么都没有发生,算法只是提前结束了?!!

现在看看上面的 message(参见 )。如果我们 运行 对此进行 google 搜索,您可能会找到类似这样的内容:

  • 总结

    b'CONVERGENCE: NORM_OF_PROJECTED_GRADIENT_<=_PGTOL'

    If the loss-landscape does not have a smoothely changing topography, the gradient descent algorithms will soon find that from one iteration to the next, there isn't much change happening and hence, will terminate further seeking. Also, if the loss-landscape is rather flat, this could see similar fate and get early-termination.

D.使损失景观更平滑

value = 1 if x>5 else 0 的二进制计算本质上是一个 step-function,它为 x 的所有大于 5 和 [=] 的值分配 1 71=] 否则。但这引入了一个扭结 - 平滑度的不连续性,这可能会在遍历 loss-landscape.

时引入问题

如果我们使用 sigmoid 函数来引入一些平滑度呢?

# Define sigmoid function
def sigmoid(x):
    """Sigmoid function."""
    return 1 / (1 + np.exp(-x))

对于上面的例子,我们可以修改如下。

你可以额外引入另一个因素(gamma: γ)如下,并尝试优化它,使景观更平滑。因此,通过控制 gamma 因子,您可以使函数更平滑并改变它在 x = 5

附近变化的速度

上图是用下面的code-snippet.

创建的
import matplotlib.pyplot as plt

%matplotlib inline 
%config InlineBackend.figure_format = 'svg' # 'svg', 'retina' 
plt.style.use('seaborn-white')

def make_figure(figtitle: str="Sigmoid Function"):
    """Make the demo figure for using sigmoid."""

    x = np.arange(-20, 20.01, 0.01)
    y1 = sigmoid(x)
    y2 = sigmoid(x - 5)
    y3 = sigmoid((x - 5)/3)
    y4 = sigmoid((x - 5)/0.3)
    fig, ax = plt.subplots(figsize=(10,5))
    plt.sca(ax)
    plt.plot(x, y1, ls="-", label="$\sigma(x)$")
    plt.plot(x, y2, ls="--", label="$\sigma(x - 5)$")
    plt.plot(x, y3, ls="-.", label="$\sigma((x - 5) / 3)$")
    plt.plot(x, y4, ls=":", label="$\sigma((x - 5) / 0.3)$")
    plt.axvline(x=0, ls="-", lw=1.3, color="cyan", alpha=0.9)
    plt.axvline(x=5, ls="-", lw=1.3, color="magenta", alpha=0.9)
    plt.legend()
    plt.title(figtitle)
    plt.show()

make_figure()

D.1。度量平滑示例

以下是如何应用函数平滑的示例。

from functools import partial

def sig(x, gamma: float=1.):
    return sigmoid(x/gamma)

def myFunc3(params, gamma: float=0.5):
    """myFunc metric v3 with smoother metric."""
    (ev, bv, vc, dv) = params
    _sig = partial(sig, gamma=gamma)
    df['Number'] = _sig(x = -(df['Dividend2'] - vc)) * 1 \
                    + _sig(x = -(df['EV2'] - dv)) * 1
    df['Return'] = (
        _sig(x = df['EV'] - ev) 
        * _sig(x = -(df['Probability'] - bv))
        * _sig(x = df['Number'] * df['Dividend'] - (vc + dv))
    )
    return -1 * (df['Return'].sum())

正如我在评论中提到的,关键问题是 np.where() 既不可微也不连续 。因此,您的 objective 函数违反了 scipy.optimize.minimize.

引擎盖下的大多数 (derivate-based) 算法的数学假设

所以,基本上,您有三个选择:

  1. 使用 derivative-free 算法并希望最好。
  2. np.where() 替换为平滑的近似值,这样您的 objective 就可以连续微分了。
  3. 将您的问题重新表述为 MIP

由于@CypherX 的回答追求的是方法 1,我想重点关注方法 2。这里,主要思想是近似 np.where 函数。一种可能的近似值是

def smooth_if_then(x):
    eps = 1e-12
    return 0.5 + x/(2*np.sqrt(eps + x*x))

连续可微。然后,给定一个 np.ndarray arr 和一个标量值 x,表达式 np.where(arr <= x, 1, 0) 等价于 smooth_if_then(x - arr).

因此,objective 函数变为:

div = df['Dividend'].values
div2 = df['Dividend2'].values
ev2 = df['EV2'].values
ev = df['EV'].values
prob = df['Probability'].values

def objective(x, *params):
    ev, bv, vc, dv = x
    div_vals, div2_vals, ev2_vals, ev_vals, prob_vals = params
    number = smooth_if_then(vc - div2_vals) + smooth_if_then(dv - ev2_vals)
    part1 = smooth_if_then(bv - prob_vals) * (number * div_vals - (vc + dv))
    part2 = smooth_if_then(-1*(ev - ev_vals)) * part1
    return -1 * part2.sum()

并使用 trust-constr 算法(这是 scipy.optimize.minimize 中最稳健的算法),产生:

res = minimize(lambda x: objective(x, div, div2, ev2, ev, prob), x0=start, bounds=b1, method="trust-constr")
 barrier_parameter: 1.0240000000000006e-08
 barrier_tolerance: 1.0240000000000006e-08
          cg_niter: 5
      cg_stop_cond: 0
            constr: [array([8.54635975e-01, 5.99253512e+02, 9.95614973e+02, 9.95614973e+02])]
       constr_nfev: [0]
       constr_nhev: [0]
       constr_njev: [0]
    constr_penalty: 1.0
  constr_violation: 0.0
    execution_time: 0.2951819896697998
               fun: 1.3046631387761482e-08
              grad: array([0.00000000e+00, 0.00000000e+00, 8.92175218e-12, 8.92175218e-12])
               jac: [<4x4 sparse matrix of type '<class 'numpy.float64'>'
    with 4 stored elements in Compressed Sparse Row format>]
   lagrangian_grad: array([-3.60651033e-09,  4.89643010e-09,  2.21847918e-09,  2.21847918e-09])
           message: '`gtol` termination condition is satisfied.'
            method: 'tr_interior_point'
              nfev: 20
              nhev: 0
               nit: 14
             niter: 14
              njev: 4
        optimality: 4.896430096425101e-09
            status: 1
           success: True
         tr_radius: 478515625.0
                 v: [array([-3.60651033e-09,  4.89643010e-09,  2.20955743e-09,  2.20955743e-09])]
                 x: array([8.54635975e-01, 5.99253512e+02, 9.95614973e+02, 9.95614973e+02])

最后但同样重要的是:使用平滑近似是实现可微性的常用方法。然而,值得一提的是这些近似值不是凸的。实际上,这意味着您的优化问题不是凸的,因此您无法保证找到的固定点(局部最小值)是全局最优解。为此,需要使用全局优化算法或将问题表述为 MIP。从数学和实践的角度来看,后者是推荐的方法。