Source code for hilo_mpc.modules.machine_learning.gp.gp

#   
#   This file is part of HILO-MPC
#
#   HILO-MPC is a toolbox for easy, flexible and fast development of machine-learning-supported
#   optimal control and estimation problems
#
#   Copyright (c) 2021 Johannes Pohlodek, Bruno Morabito, Rolf Findeisen
#                      All rights reserved
#
#   HILO-MPC is free software: you can redistribute it and/or modify
#   it under the terms of the GNU Lesser General Public License as
#   published by the Free Software Foundation, either version 3
#   of the License, or (at your option) any later version.
#
#   HILO-MPC is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#   GNU Lesser General Public License for more details.
#
#   You should have received a copy of the GNU Lesser General Public License
#   along with HILO-MPC. If not, see <http://www.gnu.org/licenses/>.
#

from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional, Sequence, Tuple, TypeVar, Union
import warnings

import casadi as ca
import numpy as np
from scipy import stats

from .inference import Inference
from .likelihood import Likelihood
from .mean import Mean
from .kernel import Kernel
from ..base import LearningBase
from ...base import Series, TimeSeries, Equations
from ...optimizer import NonlinearProgram
from ....util.machine_learning import Parameter, Hyperparameter, register_hyperparameters
from ....util.util import convert, is_list_like


Numeric = Union[int, float]
Values = Union[Numeric, np.ndarray]
Bound = Union[Numeric, str, Tuple[Numeric], Tuple[Numeric, Numeric]]
Bounds = Dict[str, Union[str, Tuple[Numeric, Numeric]]]
Array = TypeVar('Array', np.ndarray, ca.DM, ca.SX, ca.MX)
Param = TypeVar('Param', bound=Parameter)
Inf = TypeVar('Inf', bound=Inference)
Lik = TypeVar('Lik', bound=Likelihood)
Mu = TypeVar('Mu', bound=Mean)
Cov = TypeVar('Cov', bound=Kernel)


@dataclass
class Data:
    """"""
    values: np.ndarray = np.array([[]])
    SX: ca.SX = ca.SX()
    MX: ca.MX = ca.MX()


class _GPSeries(Series):
    """"""
    def __init__(self, backend: str) -> None:
        super().__init__(backend=backend)

        self._abscissa = 'x'

    def _update_dimensions(self) -> None:
        """

        :return:
        """
        pass


[docs]class GaussianProcess(LearningBase): """ Gaussian Process Regression :Note: The Gaussian process regressor currently does not use the Cholesky factorization for training and prediction. This will be implemented at a later time. :param features: names of the features :type features: list of strings :param labels: names of the labels :type labels: list of strings :param inference: :type inference: :param likelihood: :type likelihood: :param kernel: Specifying the covariance function of the GP. The kernel hyperparameters are optimized during training, defaults to :class:`hilo_mpc.core.learning.kernels.SquaredExponential`. :type kernel: kernel :param noise_variance: :type noise_variance: :param hyperprior: :type hyperprior: :param id: :type id: str, optional :param name: name of the GP :type name: str, optional :param solver: :type solver: str, optional :param solver_options: Options to the solver, e.g. ipopt. :type solver_options: dict :param kwargs: """ def __init__( self, features: Union[str, list[str]], labels: Union[str, list[str]], inference: Optional[Union[str, Inf]] = None, likelihood: Optional[Union[str, Lik]] = None, mean: Optional[Mu] = None, kernel: Optional[Cov] = None, noise_variance: Numeric = 1., hyperprior: Optional[str] = None, # Maybe we should be able to access other hyperpriors from here as well id: Optional[str] = None, name: Optional[str] = None, solver: Optional[str] = None, solver_options: Optional[dict] = None, # TODO: Switch to mapping? **kwargs ) -> None: """Constructor method""" if not is_list_like(features): features = [features] if not is_list_like(labels): labels = [labels] if len(labels) > 1: raise ValueError("Training a GP on multiple labels is not supported. Please use 'MultiOutputGP' to train " "GPs on multiple labels.") super().__init__(features, labels, id=id, name=name) if likelihood is None: likelihood = Likelihood.gaussian() elif isinstance(likelihood, str): name = likelihood.replace("'", "").replace(' ', '_').lower() if name == 'gaussian': likelihood = Likelihood.gaussian() elif name == 'logistic': likelihood = Likelihood.logistic() elif name == 'laplacian': likelihood = Likelihood.laplacian() elif name == 'students_t': likelihood = Likelihood.students_t() else: raise ValueError(f"Likelihood '{likelihood}' not recognized") self.likelihood = likelihood if inference is None: inference = Inference.exact() elif isinstance(inference, str): name = inference.replace(' ', '_').lower() if name == 'exact': inference = Inference.exact() elif name == 'laplace': inference = Inference.laplace() elif name == 'expectation_propagation': inference = Inference.expectation_propagation() elif name == 'variational_bayes': inference = Inference.variational_bayes() elif name == 'kullback_leibler': inference = Inference.kullback_leibler() else: raise ValueError(f"Inference '{inference}' not recognized") self.inference = inference if mean is None: mean = Mean.zero() self.mean = mean if kernel is None: kernel = Kernel.squared_exponential() self.kernel = kernel hyper_kwargs = {} if hyperprior is not None: hyper_kwargs['prior'] = hyperprior hyperprior_parameters = kwargs.get('hyperprior_parameters') if hyperprior_parameters is not None: hyper_kwargs['prior_parameters'] = hyperprior_parameters bounds = kwargs.get('bounds') if bounds is not None: # pragma: no cover variance_bounds = bounds.get('noise_variance') if variance_bounds is not None: if variance_bounds == 'fixed': hyper_kwargs['fixed'] = True else: hyper_kwargs['bounds'] = bounds self.noise_variance = Hyperparameter('GP.noise_variance', value=noise_variance, **hyper_kwargs) self._hyp_is_log = {'GP.noise_variance': True} self._hyp_is_log.update({parameter.name: False for parameter in self.mean.hyperparameters}) self._hyp_is_log.update({parameter.name: True for parameter in self.kernel.hyperparameters}) register_hyperparameters(self, [parameter.id for parameter in self.hyperparameters]) # bounds = self.noise_variance.log_bounds # unbounded_noise = bounds[0] == -ca.inf and bounds[1] == ca.inf # unconstrained_op = all([not self.kernel.is_bounded(), unbounded_noise]) unconstrained_op = True if solver is None: if unconstrained_op: solver = 'Newton-CG' else: # pragma: no cover # TODO: Test GP for constrained hyperparameters solver = 'L-BFGS-B' self._solver = solver if solver == 'ipopt': if solver_options is None: solver_options = {'print_time': False, 'ipopt.suppress_all_output': 'yes'} # solver_options.update({'ipopt.hessian_approximation': 'limited-memory'}) else: if solver_options is None: solver_options = {} self._solver_options = solver_options self._X_train = Data() self._y_train = Data() epsilon = kwargs.get('epsilon') if epsilon is None: epsilon = 1e-8 self._epsilon = epsilon self._where_is_what = {} self._log_marginal_likelihood = None self._gp_solver = None self._gp_args = {} self._optimization_stats = {} def __str__(self) -> str: """String representation method""" message = "Gaussian process with \n" for attribute, value in self.__dict__.items(): if attribute[0] != '_': message += f"\t {attribute}: {value} \n" return message def _initialize_solver(self) -> None: """ :return: """ with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') self._gp_solver = NonlinearProgram(solver=self._solver, solver_options=self._solver_options) for warning in w: if issubclass(warning.category, UserWarning): if warning.message.args[0] == 'Plots are disabled, since no backend was selected.': continue warnings.warn(warning.message, warning.category) def _update_gp_args( self, name: str, value: Values, bounds: Optional[Bound] ) -> None: """ :param name: :param value: :param bounds: :return: """ if self._gp_solver is not None: x_or_p, index = self._where_is_what[name] is_slice = isinstance(index, slice) if hasattr(value, 'shape'): shape = value.shape if not shape: n_val = 1 else: n_dim = len(shape) if n_dim == 1: n_val = shape[0] else: if n_dim == 2: if 1 in shape: n_val = shape[0] * shape[1] else: raise ValueError("Dimension mismatch. Expected 1 dimension, got 2.") else: raise ValueError(f"Dimension mismatch. Expected 1 dimension, got {n_dim}.") else: n_val = 1 if (is_slice and n_val != index.stop - index.start) or (is_slice and n_val == 1) or ( not is_slice and n_val > 1): # TODO: Raise error here? msg = f"Dimension of hyperparameter {name} changed. Run the setup() method again" warnings.warn(msg) else: if self._hyp_is_log[name]: if 'variance' in name: self._gp_args[x_or_p][index] = ca.log(value) / 2 else: self._gp_args[x_or_p][index] = ca.log(value) else: self._gp_args[x_or_p][index] = value if bounds is not None: # pragma: no cover if bounds == 'fixed': if n_val > 1: raise ValueError(f"Dimension mismatch between values and bounds. Supplied values have a " f"dimension bigger than 1.") lb = value ub = value elif isinstance(bounds, (int, float)): lb = bounds ub = ca.inf elif len(bounds) == 1: lb = bounds[0] ub = ca.inf elif len(bounds) == 2: lb = bounds[0] ub = bounds[1] if lb > ub: lb, ub = ub, lb else: raise ValueError("Bounds of unsupported type") if lb is not None: self._gp_args['lbx'][index] = lb if ub is not None: self._gp_args['ubx'][index] = ub @property def X_train(self) -> Data: """ The tuple contains the training data matrix as ndarray of shape (dimensions_input, number_observations) and its respective CasADi SX and MX symbols of same dimensions that are used to define the CasADi functions during fitting and prediction. It can be changed by just assigned an ndarray and its setter method will automatically adjust the symbols. :return: """ return self._X_train @X_train.setter def X_train(self, value: np.ndarray) -> None: shape = value.shape if shape[0] != self._n_features: raise ValueError(f"Dimension mismatch. Supplied dimension for the features is {shape[0]}, but " f"required dimension is {self._n_features}.") X_train_SX = ca.SX.sym('X', *shape) X_train_MX = ca.MX.sym('X', *shape) self._X_train = Data(values=value, SX=X_train_SX, MX=X_train_MX) @property def y_train(self) -> Data: """ The tuple contains the training target matrix as ndarray of shape (dimensions_input, number_observations) and its respective CasADi SX and MX symbols of same dimensions that are used to define the CasADi functions during fitting and prediction. It can be changed by just assigned an ndarray and its setter method will automatically adjust the symbols. :return: """ return self._y_train @y_train.setter def y_train(self, value: np.ndarray) -> None: shape = value.shape if shape[0] != self._n_labels: raise ValueError(f"Dimension mismatch. Supplied dimension for the labels is {shape[0]}, but required " f"dimension is {self._n_labels}.") y_train_SX = ca.SX.sym('y', *shape) y_train_MX = ca.MX.sym('y', *shape) self._y_train = Data(values=value, SX=y_train_SX, MX=y_train_MX)
[docs] def set_training_data(self, X: np.ndarray, y: np.ndarray) -> None: """Sets the training matrix and its target vector :param X: :param y: :return: """ if self._function is not None: warnings.warn("Gaussian process was already executed. Use the fit_model() method again to optimize with " "respect to the newly set training data.") new_X_shape = X.shape new_y_shape = y.shape if new_X_shape[1] != new_y_shape[1]: raise ValueError("Number of observations in training matrix and target vector do not match!") old_X_shape = self._X_train.values.shape old_y_shape = self._y_train.values.shape self.X_train = X self.y_train = y if self._gp_solver is not None: if old_X_shape == new_X_shape and old_y_shape == new_y_shape: n = new_X_shape[1] n *= self._n_features self._gp_args['p'][:n] = X.flatten() self._gp_args['p'][n:2 * n] = y.flatten() else: warnings.warn("Dimensions of training data set changed. Please run setup() method again.")
@property def hyperparameters(self) -> list[Param]: """ Includes all hyperparameters of the kernel as well as the sigma_noise hyperparameter :return: """ return [self.noise_variance] + self.mean.hyperparameters + self.kernel.hyperparameters @property def hyperparameter_names(self) -> list[str]: """ :return: """ return [self.noise_variance.name] + self.mean.hyperparameter_names + self.kernel.hyperparameter_names
[docs] def get_hyperparameter_by_name(self, name: str) -> Param: """ :param name: :return: """ return [parameter for parameter in self.hyperparameters if parameter.name == name][0]
[docs] def update(self, *args) -> None: """ :param args: :return: """ self._update_gp_args(*args)
[docs] def update_hyperparameters( self, names: Sequence[str], values: Optional[Union[Sequence[Optional[Values]], np.ndarray]] = None, bounds: Optional[Sequence[Optional[Bounds]]] = None ) -> None: """ Updates hyperparameter values and boundaries If names of hyperparameters are found in the given dictionaries, their respective value and/or boundaries will be updated to the value and/or boundaries defined in the dictionary. :Note: Instead of boundary values the string 'fixed' can be passed which flags the hyperparameter, keeping him constant during any optimization routine. This flag will be automatically set back if a list of two floats is passed. :param names: :param values: :param bounds: :return: """ for name in names: parameter = self.get_hyperparameter_by_name(name) _, index = self._where_is_what[name] if values is not None: if hasattr(values, 'ndim') and values.ndim != 1: raise ValueError("At the moment only flat NumPy arrays are supported for updating the " "hyperparameter values.") value = values[index] if value is not None: if 'variance' in name: value *= 2 if not parameter.fixed: if self._hyp_is_log[name]: value = np.exp(value) parameter.value = value if bounds is not None: # pragma: no cover bound = bounds[index] if bound is not None: parameter.bounds = bound
[docs] def initialize( self, features: Union[str, list[str]], labels: Union[str, list[str]], inference: Optional[Union[str, Inf]] = None, likelihood: Optional[Union[str, Lik]] = None, mean: Optional[Mu] = None, kernel: Optional[Cov] = None, noise_variance: Numeric = 1., hyperprior: Optional[str] = None, # Maybe we should be able to access other hyperpriors from here as well id: Optional[str] = None, name: Optional[str] = None, solver: Optional[str] = None, solver_options: Optional[dict] = None, # TODO: Switch to mapping? **kwargs ) -> None: """ :param features: :param labels: :param inference: :param likelihood: :param mean: :param kernel: :param noise_variance: :param hyperprior: :param id: :param name: :param solver: :param solver_options: :param kwargs: :return: """ # NOTE: Maybe we could abuse the __call__-method and check in there whether the class was already instantiated # (https://stackoverflow.com/questions/40457599/how-to-check-if-an-object-has-been-initialized-in-python/40459563, # https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python). # If the object was already instantiated, we execute the normal __call__-method. self.__init__(features, labels, inference=inference, likelihood=likelihood, mean=mean, kernel=kernel, noise_variance=noise_variance, hyperprior=hyperprior, id=id, name=name, solver=solver, solver_options=solver_options, **kwargs)
[docs] def setup(self, **kwargs) -> None: """ :param kwargs: :return: """ X_sym = self._X_train.SX if X_sym.is_empty(): raise RuntimeError("The training data has not been set. Please run the method set_training_data() to " "proceed.") y_sym = self._y_train.SX if y_sym.is_empty(): # NOTE: We shouldn't get here raise RuntimeError("The training data has not been set. Please run the method set_training_data() to " "proceed.") n, D = X_sym.shape X = ca.SX.sym('X', n) posterior = self.inference(X_sym, y_sym, X, self.noise_variance.SX, self.likelihood, self.mean, self.kernel) log_marginal_likelihood = posterior['log_marginal_likelihood'] mean = posterior['mean'] var = posterior['var'] hyperparameters_to_optimize = [parameter for parameter in self.hyperparameters if not parameter.fixed] w = [] w0 = [] # lbw = [] # ubw = [] k = 0 for parameter in hyperparameters_to_optimize: name = parameter.name param = parameter.SX hyperprior = parameter.prior if hyperprior is not None: log_marginal_likelihood += hyperprior(param, log=True) w.append(param) n_p = param.numel() if self._hyp_is_log[name]: if 'variance' in name: w0.append(parameter.log / 2) else: w0.append(parameter.log) # lbw.append(n_p * [parameter.log_bounds[0]]) # ubw.append(n_p * [parameter.log_bounds[1]]) else: w0.append(parameter.value) # lbw.append(n_p * [parameter.bounds[0]]) # ubw.append(n_p * [parameter.bounds[1]]) if n_p == 1: self._where_is_what[name] = ('x0', k) else: self._where_is_what[name] = ('x0', slice(k, k + n_p)) k += n_p hyperparameters_fixed = [parameter for parameter in self.hyperparameters if parameter.fixed] p = [] p0 = [] k = 2 * D for parameter in hyperparameters_fixed: name = parameter.name p.append(parameter.SX) n_p = parameter.SX.numel() if self._hyp_is_log[name]: if 'variance' in name: p0.append(parameter.log / 2) else: p0.append(parameter.log) else: p0.append(parameter.value) if n_p == 1: self._where_is_what[name] = ('p', k) else: self._where_is_what[name] = ('p', slice(k, k + n_p)) k += n_p w = ca.vertcat(*w) w0 = ca.vertcat(*w0) # lbw = ca.vertcat(*lbw) # ubw = ca.vertcat(*ubw) p = ca.vertcat(X_sym.T[:], y_sym.T[:], *p) # TODO: Check if this is actually the same as what is done for the SX variables p0 = np.concatenate([self.X_train.values.flatten(), self.y_train.values.flatten(), p0]) self._initialize_solver() # for some reason this doesn't result in free variables # self._gp_solver.set_decision_variables(w, lower_bound=lbw, upper_bound=ubw) self._gp_solver.set_decision_variables(w) self._gp_solver.set_parameters(p) self._gp_solver.set_objective(-log_marginal_likelihood) self._log_marginal_likelihood = ca.Function('log_marginal_likelihood', [w, p], [log_marginal_likelihood], ['x0', 'p'], ['log_marg_lik']) self._function = ca.Function( 'prediction', [X, w, p], [mean, var], ['X', 'x0', 'p'], ['mean', 'variance'] ) self._gp_solver.setup() self._gp_solver.set_initial_guess(w0) self._gp_solver.set_parameter_values(p0) self._gp_args.update({ 'x0': w0, 'p': p0, # 'lbx': lbw, # 'ubx': ubw })
[docs] def is_setup(self) -> bool: """ :return: """ if self._gp_solver is not None: return True else: return False
[docs] def log_marginal_likelihood(self) -> float: """ :return: """ return float(self._log_marginal_likelihood(x0=self._gp_args['x0'], p=self._gp_args['p'])['log_marg_lik'])
[docs] def fit_model(self) -> None: """ Optimizes the hyperparameters by minimizing the negative log marginal likelihood. The model fit is the training phase in the GP regression. Given the design data (training observations and their targets) and suitable start values and bounds for the hyperparameters, the hyperparameters will be adapted by minimizing the negative log marginal likelihood. :Note: Instead of boundary values the string 'fixed' can be passed which flags the hyperparameter, keeping it constant during the optimization routine. :return: """ if self._gp_solver is None: raise RuntimeError("The GP has not been set up yet. Please run the setup() method before fitting.") self._gp_solver.solve() solution = self._gp_solver.solution names = [parameter.name for parameter in self.hyperparameters if not parameter.fixed] values = solution.get_by_id('x:f').full().flatten() self.update_hyperparameters(names, values=values) self._optimization_stats = self._gp_solver.stats() if not self._optimization_stats['success']: # pragma: no cover if self._solver == 'ipopt': return_status = self._optimization_stats['return_status'] if return_status == 'Infeasible_Problem_Detected': message = 'Infeasible problem detected' elif return_status == 'Restoration_Failed': message = 'Restoration failed' elif return_status == 'Maximum_Iterations_Exceeded': message = 'Maximum iterations exceeded' else: raise RuntimeError(f"Unrecognized return status: {return_status}") else: message = self._optimization_stats['message'] warnings.warn(f"Fitting of GP didn't terminate successfully\nSolver message: {message}\n" f"Try to use a different solver")
[docs] def predict(self, X_query: Array, noise_free: bool = False) -> (Array, Array): """ :param X_query: :param noise_free: :return: """ if self._function is None: raise RuntimeError("The GP has not been set up yet. Please run the setup() method before predicting.") prediction = self._function(X=X_query, x0=self._gp_args['x0'], p=self._gp_args['p']) mean, var = prediction['mean'], prediction['variance'] if not noise_free: var += self.noise_variance.value if isinstance(X_query, np.ndarray): mean = mean.full() var = var.full() return mean, var
[docs] def predict_quantiles( self, quantiles: Optional[tuple[Numeric, Numeric]] = None, X_query: Optional[np.ndarray] = None, mean: Optional[np.ndarray] = None, var: Optional[np.ndarray] = None ) -> Optional[tuple[np.ndarray, np.ndarray]]: """ :param quantiles: :param X_query: :param mean: :param var: :return: """ if quantiles is None: quantiles = (2.5, 97.5) if X_query is not None: mean, var = self.predict(X_query, noise_free=True) elif mean is None and var is None: return None predictive_quantiles = [stats.norm.ppf(q / 100) * np.sqrt(var + float(self.noise_variance.value)) + mean for q in quantiles] return predictive_quantiles[0], predictive_quantiles[1]
[docs] def plot(self, X_query: Array, backend: Optional[str] = None, **kwargs) -> None: """ :param X_query: :param backend: :param kwargs: :return: """ X_train = self._X_train.values y_train = self._y_train.values n_x = self._n_features post_mean, _ = self.predict(X_query, noise_free=True) solution = _GPSeries(backend=backend) if backend is None: backend = solution.plot_backend names = ['x', 'y'] vector = { 'x': { 'values_or_names': self._features, 'description': n_x * [''], 'labels': self._features, 'units': n_x * [''], 'shape': (n_x, 0), 'data_format': ca.DM }, 'y': { 'values_or_names': ['post. mean at test points'], 'description': [''], 'labels': self._labels, 'units': [''], 'shape': (1, 0), 'data_format': ca.DM } } solution.setup(*names, **vector) solution.set('x', X_query) solution.set('y', post_mean) plots = self._n_labels * tuple((feature, 'post. mean at test points') for feature in self._features) ext_data_x = [] ext_data_y = [] for i in range(self._n_labels): for j in range(n_x): ext_data_x.append({'data': X_train[j, :], 'subplot': i * n_x + j, 'label': 'training points'}) ext_data_y.append( { 'data': y_train[i, :], 'subplot': i * n_x + j, 'label': 'training points', 'kind': 'scatter', 'marker': '+', 'marker_size': 20 if backend == 'bokeh' else 60 } ) plot_kwargs = kwargs.copy() if backend == 'bokeh': if kwargs.get("output_notebook", False): plot_kwargs["figsize"] = kwargs.get("figsize", (300, 300)) else: plot_kwargs["figsize"] = kwargs.get("figsize", (500, 500)) plot_kwargs["major_label_text_font_size"] = kwargs.get("major_label_text_font_size", "12pt") plot_kwargs["axis_label_text_font_size"] = kwargs.get("axis_label_text_font_size", "12pt") plot_kwargs['marker'] = kwargs.get('marker', 'o') if backend == 'bokeh': plot_kwargs['marker_size'] = kwargs.get('marker_size', 5) elif backend == 'matplotlib': plot_kwargs['marker_size'] = kwargs.get('marker_size', 20) plot_kwargs['title'] = kwargs.get('title', n_x * ('', )) plot_kwargs['legend'] = kwargs.get('legend', True) plot_kwargs['kind'] = 'scatter' solution.plot(*plots, x_data=ext_data_x, y_data=ext_data_y, **plot_kwargs)
[docs] def plot_1d( self, quantiles: Optional[tuple[Numeric, Numeric]] = None, resolution: Optional[int] = 200, backend: Optional[str] = None, **kwargs ) -> None: """ :param quantiles: :param resolution: :param backend: :param kwargs: :return: """ X_train = self._X_train.values y_train = self._y_train.values n_x = X_train.shape[0] if n_x > 1: raise RuntimeError("The method plot_1d is only supported for Gaussian processes with one feature.") x_min = X_train.min() x_max = X_train.max() x_min, x_max = x_min - 0.25 * (x_max - x_min), x_max + 0.25 * (x_max - x_min) X = np.linspace([float(x_min)], [float(x_max)], resolution, axis=1) y, var = self.predict(X, noise_free=True) lb, ub = self.predict_quantiles(quantiles=quantiles, mean=y, var=var) solution = _GPSeries(backend=backend) if backend is None: backend = solution.plot_backend names = ['x', 'y'] vector = { 'x': { 'values_or_names': self._features, 'description': [''], 'labels': self._features, 'units': [''], 'shape': (1, 0), 'data_format': ca.DM }, 'y': { 'values_or_names': ['mean'], 'description': [''], 'labels': self._labels, 'units': [''], 'shape': (1, 0), 'data_format': ca.DM } } solution.setup(*names, **vector) solution.set('x', X) solution.set('y', y) ext_data_x = [{'data': X_train, 'subplot': 0, 'label': 'data'}] ext_data_y = [{'data': y_train, 'subplot': 0, 'label': 'data', 'kind': 'scatter'}] plot_kwargs = kwargs.copy() if backend == 'bokeh': if kwargs.get("output_notebook", False): plot_kwargs["figsize"] = kwargs.get("figsize", (300, 300)) else: plot_kwargs["figsize"] = kwargs.get("figsize", (500, 500)) plot_kwargs["line_width"] = kwargs.get("line_width", 2) plot_kwargs["major_label_text_font_size"] = kwargs.get("major_label_text_font_size", "12pt") plot_kwargs["axis_label_text_font_size"] = kwargs.get("axis_label_text_font_size", "12pt") plot_kwargs['color'] = kwargs.get('color', ['#3465a4', 'k']) plot_kwargs['marker'] = kwargs.get('marker', 'o') if backend == 'bokeh': plot_kwargs['marker_size'] = kwargs.get('marker_size', 15) elif backend == 'matplotlib': plot_kwargs['marker_size'] = kwargs.get('marker_size', 60) plot_kwargs['fill_between'] = [ {'x': self._features[0], 'lb': lb, 'ub': ub, 'label': 'confidence', 'line_color': '#204a87', 'line_width': .5, 'fill_color': '#729fcf', 'fill_alpha': .2} ] plot_kwargs['title'] = kwargs.get('title', "GP regression") plot_kwargs['legend'] = kwargs.get('legend', True) solution.plot((self._features[0], 'mean'), x_data=ext_data_x, y_data=ext_data_y, **plot_kwargs)
[docs] def plot_prediction_error( self, X_query: Array, y_query: Array, noise_free: bool = False, backend: Optional[str] = None, **kwargs ) -> None: """ :param X_query: :param y_query: :param noise_free: :param backend: :param kwargs: :return: """ post_mean, post_var = self.predict(X_query, noise_free=noise_free) error = np.absolute(post_mean - y_query) post_std = np.sqrt(post_var) n_samples = X_query.shape[1] solution = TimeSeries(backend=backend) if backend is None: backend = solution.plot_backend names = ['t', 'x'] vector = { 't': { 'values_or_names': ['t'], 'description': [''], 'labels': [''], 'units': [''], 'shape': (1, 0), 'data_format': ca.DM }, 'x': { 'values_or_names': ['error', 'post_std'], 'description': 2 * [''], 'labels': 2 * [''], 'units': 2 * [''], 'shape': (2, 0), 'data_format': ca.DM } } solution.setup(*names, **vector) solution.set('t', np.linspace([0.], [n_samples - 1], n_samples, axis=1)) solution.set('x', np.append(error, post_std, axis=0)) plot_kwargs = kwargs.copy() plot_kwargs['title'] = ('prediction error', 'prediction function standard deviation') plot_kwargs['xlabel'] = ('', '') plot_kwargs['ylabel'] = ('', '') plot_kwargs['kind'] = 'scatter' plot_kwargs['marker'] = 'o' plot_kwargs['marker_size'] = 5 if backend == 'bokeh' else 20 solution.plot(('t', 'error'), ('t', 'post_std'), **plot_kwargs)
class GPArray: """""" def __init__(self, n_gps: int) -> None: """Constructor method""" self._n_gps = n_gps self._gps = np.empty((self._n_gps, 1), dtype=object) __array_ufunc__ = None # https://stackoverflow.com/questions/38229953/array-and-rmul-operator-in-python-numpy def __len__(self) -> int: """Length method""" return self._n_gps def __iter__(self) -> Optional[GaussianProcess]: """Item iteration method""" for k in range(self._n_gps): gp = self._gps[k, 0] if gp is None: gp = object.__new__(GaussianProcess) self._gps[k, 0] = gp yield gp def __getitem__(self, item: int) -> Optional[GaussianProcess]: """Item getter method""" return self._gps[item, 0] def __rmatmul__(self, other: Array) -> GPArray: """Multiplication method (from the right side)""" other = convert(other, ca.DM) features = [] labels = ca.SX.zeros(self._n_gps) for k in range(self._n_gps): gp = self._gps[k, 0] query = [ca.SX.sym(k) for k in gp.features] out, _ = gp.predict(ca.vertcat(*query)) features += query labels[k] += out # NOTE: @-operator doesn't seem to work because we set __array_ufunc__ to None equations = Equations(ca.SX, {'matmul': ca.mtimes(other, labels)}) return equations.to_function(f'{type(GaussianProcess).__name__}_matmul', *features) # def append(self, gp: GaussianProcess) -> None: # """ # # :param gp: # :return: # """ # # def pop(self) -> GaussianProcess: # """ # # :return: # """ # # def remove(self, item) -> None: # """ # # :param item: # :return: # """ __all__ = [ 'GaussianProcess', 'GPArray' ]