from collections import OrderedDict
import numpy as np
from joblib import Parallel, delayed
from pyGPGO.logger import EventLogger
from scipy.optimize import minimize
[docs]class GPGO:
[docs] def __init__(self, surrogate, acquisition, f, parameter_dict, n_jobs=1):
"""
Bayesian Optimization class.
Parameters
----------
Surrogate: Surrogate model instance
Gaussian Process surrogate model instance.
Acquisition: Acquisition instance
Acquisition instance.
f: fun
Function to maximize over parameters specified by `parameter_dict`.
parameter_dict: dict
Dictionary specifying parameter, their type and bounds.
n_jobs: int. Default 1
Parallel threads to use during acquisition optimization.
Attributes
----------
parameter_key: list
Parameters to consider in optimization
parameter_type: list
Parameter types.
parameter_range: list
Parameter bounds during optimization
history: list
Target values evaluated along the procedure.
"""
self.GP = surrogate
self.A = acquisition
self.f = f
self.parameters = parameter_dict
self.n_jobs = n_jobs
self.parameter_key = list(parameter_dict.keys())
self.parameter_value = list(parameter_dict.values())
self.parameter_type = [p[0] for p in self.parameter_value]
self.parameter_range = [p[1] for p in self.parameter_value]
self.history = []
self.logger = EventLogger(self)
[docs] def _sampleParam(self):
"""
Randomly samples parameters over bounds.
Returns
-------
dict:
A random sample of specified parameters.
"""
d = OrderedDict()
for index, param in enumerate(self.parameter_key):
if self.parameter_type[index] == 'int':
d[param] = np.random.randint(
self.parameter_range[index][0], self.parameter_range[index][1])
elif self.parameter_type[index] == 'cont':
d[param] = np.random.uniform(
self.parameter_range[index][0], self.parameter_range[index][1])
else:
raise ValueError('Unsupported variable type.')
return d
[docs] def _firstRun(self, n_eval=3):
"""
Performs initial evaluations before fitting GP.
Parameters
----------
n_eval: int
Number of initial evaluations to perform. Default is 3.
"""
self.X = np.empty((n_eval, len(self.parameter_key)))
self.y = np.empty((n_eval,))
for i in range(n_eval):
s_param = self._sampleParam()
s_param_val = list(s_param.values())
self.X[i] = s_param_val
self.y[i] = self.f(**s_param)
self.GP.fit(self.X, self.y)
self.tau = np.max(self.y)
self.history.append(self.tau)
[docs] def _acqWrapper(self, xnew):
"""
Evaluates the acquisition function on a point.
Parameters
----------
xnew: np.ndarray, shape=((len(self.parameter_key),))
Point to evaluate the acquisition function on.
Returns
-------
float
Acquisition function value for `xnew`.
"""
new_mean, new_var = self.GP.predict(xnew, return_std=True)
new_std = np.sqrt(new_var + 1e-6)
return -self.A.eval(self.tau, new_mean, new_std)
[docs] def _optimizeAcq(self, method='L-BFGS-B', n_start=100):
"""
Optimizes the acquisition function using a multistart approach.
Parameters
----------
method: str. Default 'L-BFGS-B'.
Any `scipy.optimize` method that admits bounds and gradients. Default is 'L-BFGS-B'.
n_start: int.
Number of starting points for the optimization procedure. Default is 100.
"""
start_points_dict = [self._sampleParam() for i in range(n_start)]
start_points_arr = np.array([list(s.values())
for s in start_points_dict])
x_best = np.empty((n_start, len(self.parameter_key)))
f_best = np.empty((n_start,))
if self.n_jobs == 1:
for index, start_point in enumerate(start_points_arr):
res = minimize(self._acqWrapper, x0=start_point, method=method,
bounds=self.parameter_range)
x_best[index], f_best[index] = res.x, np.atleast_1d(res.fun)[0]
else:
opt = Parallel(n_jobs=self.n_jobs)(delayed(minimize)(self._acqWrapper,
x0=start_point,
method=method,
bounds=self.parameter_range) for start_point in
start_points_arr)
x_best = np.array([res.x for res in opt])
f_best = np.array([np.atleast_1d(res.fun)[0] for res in opt])
self.best = x_best[np.argmin(f_best)]
[docs] def updateGP(self):
"""
Updates the internal model with the next acquired point and its evaluation.
"""
kw = {param: self.best[i]
for i, param in enumerate(self.parameter_key)}
f_new = self.f(**kw)
self.GP.update(np.atleast_2d(self.best), np.atleast_1d(f_new))
self.tau = np.max(self.GP.y)
self.history.append(self.tau)
[docs] def getResult(self):
"""
Prints best result in the Bayesian Optimization procedure.
Returns
-------
OrderedDict
Point yielding best evaluation in the procedure.
float
Best function evaluation.
"""
argtau = np.argmax(self.GP.y)
opt_x = self.GP.X[argtau]
res_d = OrderedDict()
for i, (key, param_type) in enumerate(zip(self.parameter_key, self.parameter_type)):
if param_type == 'int':
res_d[key] = int(round(opt_x[i]))
else:
res_d[key] = opt_x[i]
return res_d, self.tau
[docs] def run(self, max_iter=10, init_evals=3, resume=False):
"""
Runs the Bayesian Optimization procedure.
Parameters
----------
max_iter: int
Number of iterations to run. Default is 10.
init_evals: int
Initial function evaluations before fitting a GP. Default is 3.
resume: bool
Whether to resume the optimization procedure from the last evaluation. Default is `False`.
"""
if not resume:
self.init_evals = init_evals
self._firstRun(self.init_evals)
self.logger._printInit(self)
for iteration in range(max_iter):
self._optimizeAcq()
self.updateGP()
self.logger._printCurrent(self)