Source code for hilo_mpc.modules.estimator.mhe

#   
#   This file is part of HILO-MPC
#
#   HILO-MPC is a toolbox for easy, flexible and fast development of machine-learning-supported
#   optimal control and estimation problems
#
#   Copyright (c) 2021 Johannes Pohlodek, Bruno Morabito, Rolf Findeisen
#                      All rights reserved
#
#   HILO-MPC is free software: you can redistribute it and/or modify
#   it under the terms of the GNU Lesser General Public License as
#   published by the Free Software Foundation, either version 3
#   of the License, or (at your option) any later version.
#
#   HILO-MPC is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#   GNU Lesser General Public License for more details.
#
#   You should have received a copy of the GNU Lesser General Public License
#   along with HILO-MPC. If not, see <http://www.gnu.org/licenses/>.
#

from copy import deepcopy
import warnings

import casadi as ca
import casadi.tools as catools
import numpy as np

from .base import Estimator
from ..optimizer import DynamicOptimization
from ...util.modeling import MHEQuadraticCost, GenericConstraint, continuous2discrete
from ...util.util import check_and_wrap_to_list, check_and_wrap_to_DM, check_if_list_of_none, check_if_list_of_string, \
    scale_vector, who_am_i


[docs] class MovingHorizonEstimator(Estimator, DynamicOptimization): """ Moving Horizon Estimator (MHE) class for state and parameter estimation. :param model: HILO-MPC model object that will be used in the estimator. :type model: HILO-MPC Model :param id: id name :type id: str, optional :param name: The name of the MHE. By default the MHE has no name. :type name: str, optional :param plot_backend: Plot package. Can be 'bokeh' (recommended) or 'matplotlib' (default: matplotlib). :type plot_backend: str, optional :param time: Initial time. Useful for time-varying systems (default: 0 ). :type time: float or int, optional mhe_opts: options list =================== ============================= Opts Description =================== ============================= arrival_cost_update Can be smoothing or filtering =================== ============================= """ def __init__(self, model, id=None, name=None, plot_backend=None, time=0) -> None: """Constructor method""" super().__init__(model, id=id, name=name, plot_backend=plot_backend) self._nlp_options_is_set = False self._arrival_term_flag = False self._stage_term_flag = False self._state_noise_flag = False self._horizon_is_set = False self._box_constraints_is_set = False self._initial_guess_is_set = False self._integration_opts_is_set = False self._nlp_opts_is_set = False self._nlp_solver_is_set = False self._stage_constraints_flag = False self._custom_constraints_flag = False self._change_mode_term_flat = False self._scaling_is_set = False self._time_varying_parameters_is_set = False self._sampling_time_is_set = False self._soft_constraints_flag = False self._mixed_integer_flag = False self._mode_guess_is_set = False self._stage_term = 0 self._arrival_term = 0 self._horizon_is_reached = False # Initialize the costs self.quad_stage_cost = MHEQuadraticCost(self._model) self.quad_arrival_cost = MHEQuadraticCost(self._model) self._horizon = int() # This will be populated with CasADi symbolics and represent the state and measurement noise respectively self._y_meas = ca.SX.sym('y', 0) self._w = ca.SX.sym('w', 0) self._time_varying_parameters_ind = None # Clock time self._time = time if self._model.n_y == 0: warnings.warn( f"The model has no measurement equations, I am assuming measurements of all states " f"{self._model.dynamical_state_names} are available." ) self._meas_eq_exist = False else: self._meas_eq_exist = True if self._meas_eq_exist: meas_names = self._model.measurement_names else: meas_names = self._model.dynamical_state_names for s in meas_names: s_w = ca.SX.sym(s + '_meas') self._y_meas = ca.vertcat(*[self._y_meas, s_w]) # Here the measurements of input, output and time varying parameters self._y_history = [] self._u_history = [] self._tvp_history = [] # Counter that counts the number of measurements collected self._meas_counter = 0 self.meas_num = len(meas_names) self.change_input_term = [] self.x_ub = [] self.x_lb = [] self.x_guess = [] self.w_ub = [] self.w_lb = [] self.w_guess = [] self._p_ind = [] self._x_ind = [] self._w_ind = [] # Initialize generic stage and terminal constraints self.stage_constraint = GenericConstraint(self._model) self.time_varying_parameters = [] self.x_scaling = [] self.w_scaling = [] def _update_type(self) -> None: """Set the estimator type identifier to 'MHE'.""" self._type = 'MHE' def _define_cost_terms(self): """ Creates the SX expressions for the stage or terminal cost :return: """ self._state_noise_flag = self.quad_stage_cost._has_state_noise if self._state_noise_flag: self._w = self.quad_stage_cost.w self.quad_stage_cost._setup(x_scale=self._x_scaling, u_scale=self._u_scaling, p_scale=self._p_scaling, w_scale=self._w_scaling) if self.quad_stage_cost._is_set: self._stage_term += self.quad_stage_cost.cost self._stage_term_flag = True self.quad_arrival_cost._setup(x_scale=self._x_scaling, u_scale=self._u_scaling, p_scale=self._p_scaling, w_scale=self._w_scaling) if self.quad_arrival_cost._is_set: self._arrival_term += self.quad_arrival_cost.cost self._arrival_term_flag = True def _check_measurements(self, y, u): """Validate measurement dimensions and convert to lists. :param y: Output measurements :param u: Input measurements (optional) :return: Validated and wrapped measurements (y, u) :raises ValueError: If measurement dimensions don't match model """ y = check_and_wrap_to_list(y) if not len(y) == self.meas_num: raise ValueError( f'You passed {len(y)} output measurements but the measurement equation has {self.meas_num} output') if u is not None: u = check_and_wrap_to_list(u) if not len(u) == self._model.n_u: raise ValueError( f'You passed {len(u)} input measurements but the model has {self._model.n_u} inputs') return y, u def _check_mhe_is_well_posed(self): """Verify that all required MHE components are properly configured. :raises ValueError: If cost functions, horizon, or constraint dimensions are invalid """ # TODO make sure all the important things are checked if self._stage_term_flag is False and self._arrival_term_flag is False: raise ValueError("You need to define a cost function before setting up the MHE.") if self._horizon is None: raise ValueError("You must set a prediction horizon length before") if len(self._x_ub) != len(self._x_lb): raise ValueError("x_ub has not the same dimension of x_lb.") if len(self._x_ub) != self._model.n_x: raise ValueError(f"x_ub has not the same dimension of the state of the model. " f"x_ub has dimension {len(self._x_ub)} while" f"the model has {self._model.n_x} states") if len(self._x_lb) != self._model.n_x: raise ValueError(f"x_lb has not the same dimension of the state of the model. " f"x_lb has dimension {len(self._x_lb)} while" f"the model has {self._model.n_x} states") def _scale_problem(self): """Apply scaling factors to bounds, guesses, and model variables.""" self._x_ub = scale_vector(self._x_ub, self._x_scaling) self._x_lb = scale_vector(self._x_lb, self._x_scaling) self._x_guess = scale_vector(self._x_guess, self._x_scaling) self._w_ub = scale_vector(self._w_ub, self._w_scaling) self._w_lb = scale_vector(self._w_lb, self._w_scaling) self._w_guess = scale_vector(self._w_guess, self._w_scaling) if self._model.n_p > 0: self._p_ub = scale_vector(self._p_ub, self._p_scaling) self._p_lb = scale_vector(self._p_lb, self._p_scaling) self._p_guess = scale_vector(self._p_guess, self._p_scaling) # ... ode ... self._model.scale(self._u_scaling, id='u') self._model.scale(self._x_scaling, id='x') self._model.scale(self._p_scaling, id='p') def _update_arrival_states(self): """ The state guess in the arrival cost can be updated automatically depending on the method. :return: """ if self._nlp_solution is not None: if self._nlp_options['arrival_guess_update'] == 'smoothing': return self._nlp_solution['x'][self._x_ind[2]] elif self._nlp_options['arrival_guess_update'] == 'filtering': raise NotImplementedError("The filtering update is not yet implemented.") else: return self.quad_arrival_cost.x_guess def _update_arrival_param(self): """ The parameter guess in the arrival cost can be updated automatically :return: """ if self._nlp_solution is not None: return self._nlp_solution['x'][self._p_ind[0]] else: return self.quad_arrival_cost.p_guess
[docs] def add_measurements(self, y_meas, u_meas=None): """Add new measurements to the estimation window. Measurements are stored in a sliding window of length equal to the horizon. When the horizon is reached, the MHE is ready to estimate. :param y_meas: Output measurements at current time :param u_meas: Input measurements at current time (optional) :raises RuntimeError: If MHE has not been set up via setup() """ if not self._nlp_setup_done: raise RuntimeError(f"You need to setup the MHE by running .setup() before running {who_am_i()}") # Add measurements to the history y_meas = check_and_wrap_to_list(y_meas) if u_meas is not None: u_meas = check_and_wrap_to_list(u_meas) # Quality check y_meas, u_meas = self._check_measurements(y_meas, u_meas) self._y_history.append(y_meas) if u_meas is not None: self._u_history.append(u_meas) self._meas_counter += 1 if self._meas_counter == self._horizon: # The horizon has been reached. The MHE can start self._horizon_is_reached = True if self._meas_counter > self._horizon: # Shift horizon # The last measurements will be forgotten self._y_history.pop(0) if u_meas is not None: self._u_history.pop(0)
[docs] def estimate(self, x_arrival=None, p_arrival=None, v0=None, runs=0, **kwargs): """Solve the MHE optimization problem to estimate states and parameters. :param x_arrival: Prior state estimate at beginning of horizon (optional, auto-updated if None) :param p_arrival: Prior parameter estimate (optional, auto-updated if None) :param v0: Initial guess for optimization variables (optional) :param runs: Number of optimization runs with random perturbations (default: 0 = single run, if >0 performs multistart) :param kwargs: Additional options (e.g., pert_factor for perturbation magnitude) :return: Tuple (x_opt, p_opt) with estimated current state and parameters (None before horizon reached) :raises ValueError: If MHE has not been set up via setup() """ # TODO Check the shape of p0, x0 # TODO to test if not self._nlp_setup_done: raise ValueError("You need to setup the nlp before optimizing. Type *mheObject*.setup()") if not isinstance(runs, int) and not runs > 0: raise TypeError("the 'runs' parameter must be a positive integer") # Update the time self._time += self._sampling_interval if self._horizon_is_reached: # Get external parameters param = self._ext_parameters(0) if x_arrival is not None: x_arrival = check_and_wrap_to_DM(x_arrival) if len(x_arrival) != self._model.n_x: raise ValueError( 'The model has {} states(s): {}. You must pass me a guess of the values before ' 'running the optimization'.format( self._model.n_x, self._model.dynamical_state_names)) else: x_arrival = self._update_arrival_states() param['x_arrival'] = x_arrival if self._model.n_p > 0: if p_arrival is not None: p_arrival = check_and_wrap_to_DM(p_arrival) else: p_arrival = self._update_arrival_param() param['p_arrival'] = p_arrival if self._model.n_y > 0: param['y_meas'] = np.array(self._y_history).T # y_meas must be self._horizon, self.meas_num if self._model.n_u > 0: param['u_meas'] = np.array(self._u_history).T # y_meas must be self._horizon, self._model.n_u # TODO when varying sampling times are implemented, give the possibility to provide varying sampling times. dt_grid = None if dt_grid is None: param['dt'] = ca.repmat(self.sampling_interval, self.horizon) param['time'] = self._time if v0 is None: v0 = self._v0 if runs == 0: sol = self._solver(x0=v0, lbx=self._v_lb, ubx=self._v_ub, lbg=self._g_lb, ubg=self._g_ub, p=param) self._nlp_solution = sol if self._model.n_p > 0: p_opt = sol['x'][self._p_ind[0]] * self._p_scaling # NOTE: this returns the one step-ahead prediction. # To return the filtered prediction one need to access -2, but remember # that the time that enters the solution object must be one time step back! x_opt = sol['x'][self._x_ind[-1]] * self._x_scaling self._v0 = sol['x'] else: pert_factor = kwargs.get('pert_factor', 0.1) f_r_better = np.inf v00 = v0 for r in range(runs): sol = self._solver(x0=v00, lbx=self._v_lb, ubx=self._v_ub, lbg=self._g_lb, ubg=self._g_ub, p=param) if sol['f'] < f_r_better: f_r_better = sol['f'] self._nlp_solution = sol if self._model.n_p: p_opt = sol['x'][self._p_ind[0]] * self._p_scaling x_opt = sol['x'][self._x_ind[-1]] * self._x_scaling self._v0 = sol['x'] v00 = v0 + v0 * (1 - 2 * np.random.rand(self._n_v)) * pert_factor # Get the status of the solver self._solver_status_wrapper() # print output self._print_message() if self._model.n_p > 0: self._solution.add('p', p_opt) self._solution.add('x', x_opt) self._solution.add('t', self._time) return x_opt, p_opt else: self._solution.add('x', x_opt) self._solution.add('t', self._time) return x_opt, None else: return None, None
[docs] def setup(self, options=None, nlp_opts=None, solver='ipopt'): """Configure and build the MHE optimization problem. This method constructs the NLP formulation, sets up the solver, and prepares the MHE for estimation. Must be called before estimate(). :param options: Dictionary with NLP formulation options (integration method, collocation settings, etc.) :param nlp_opts: Solver-specific options (deprecated, use solver options via set_solver_opts) :param solver: Solver name ('ipopt' or 'qpsol', default: 'ipopt') """ if not self._scaling_is_set: self.set_scaling() if not self._time_varying_parameters_is_set: self.set_time_varying_parameters() if not self._box_constraints_is_set: self.set_box_constraints() if not self._initial_guess_is_set: self.set_initial_guess() if not self._stage_constraints_flag: self.set_aux_nonlinear_constraints() if not self._nlp_options_is_set: self.set_nlp_options(options) if not self._nlp_solver_is_set: self.set_nlp_solver(solver) # solver ty-e if not self._solver_options_is_set: self.set_solver_opts() # solver ipopt.blabla if not self._sampling_time_is_set: self.set_sampling_interval() # Setup the solution TimeSeries self._populate_solution() # Define cost terms self._define_cost_terms() # Scaling... self._scale_problem() # Check if NLP is well posed self._check_mhe_is_well_posed() # Get all the variables of the time-varying references. These need to be passed to the stage function t_ref_placeholder = ca.SX.sym('t_ref', 0) for tv_ref in self.quad_stage_cost._tv_ref_list: t_ref_placeholder = ca.vertcat(t_ref_placeholder, tv_ref['placeholder']) # Get all the variables of the references that change once per iteration. These need to be passed to the stage # function p_arrival = ca.SX.sym('p_arrival', 0) x_arrival = ca.SX.sym('x_arrival', 0) for iter_ref in self.quad_arrival_cost._iter_ref_list: if iter_ref['type'] == 'parameters': p_arrival = iter_ref['placeholder'] elif iter_ref['type'] == 'states': x_arrival = iter_ref['placeholder'] if self._nlp_setup_done is False: model = self._model # ... objective function. if self._arrival_term_flag: self._arrival_term_fun = ca.Function('arrival_term', [model.x, model.p, x_arrival, p_arrival], [self._arrival_term]) if self._stage_term_flag: self._stage_term_fun = ca.Function('stage_term', [self._w, model.x, model.u, t_ref_placeholder], [self._stage_term]) # Check time varying parameters tvp_ind = [] if len(self._time_varying_parameters) != 0: p_names = model.parameter_names for tvp in self._time_varying_parameters: assert tvp in p_names, f"The time-varying parameter {tvp} is not in the model parameter. " \ f"The model parameters are {p_names}." tvp_ind.append(p_names.index(tvp)) self._time_varying_parameters_ind = tvp_ind self._n_tvp = len(tvp_ind) # Define CasADi function of auxiliary nonlinear constraints if self.stage_constraint.is_set: if self.stage_constraint.is_soft: e_stage = ca.SX.sym('e_stage', self.stage_constraint.constraint.size1()) self._stage_constraints_fun = ca.Function('soft_aux_nl_constr', [model.x, self._w, model.p, e_stage], [ca.vertcat(self.stage_constraint.constraint - e_stage, -self.stage_constraint.constraint - e_stage)]) else: self._stage_constraints_fun = ca.Function('soft_aux_nl_constr', [model.x, self._w, model.p], [self.stage_constraint.constraint]) problem = dict(self._model) if self._nlp_options['integration_method'] == 'collocation': continuous2discrete(problem, **self._nlp_options) # Slack for soft constraints ek = ca.SX.sym('e', self.stage_constraint.size) # Add all constraints to the collocation points box constraints # TODO is here options['degree']+ 1 or problem['ode']??? n_xik = (self._nlp_options['degree']) * model.n_x n_zik = (self._nlp_options['degree']) * model.n_z x_ik_guess = np.tile(self._x_guess, self._nlp_options['degree']) x_ik_ub = np.tile(self._x_ub, self._nlp_options['degree']) x_ik_lb = np.tile(self._x_lb, self._nlp_options['degree']) if model.n_z > 0: z_ik_guess = np.tile(self._z_guess, self._nlp_options['degree']) z_ik_ub = np.tile(self._z_ub, self._nlp_options['degree']) z_ik_lb = np.tile(self._z_lb, self._nlp_options['degree']) # nonlinear constraints # Constraints in the control interval gk_col = [] gk_col_lb = [] gk_col_ub = [] for k in range(self._nlp_options['degree']): x_col = problem['collocation_points_ode'][k] if self.stage_constraint.is_set: if self.stage_constraint.is_soft: residual = self._stage_constraints_fun(x_col, self._w, problem['p'], ek) gk_col.append(residual) gk_col_lb.append(np.repeat(-np.inf, self.stage_constraint.size * 2)) gk_col_ub.append(self.stage_constraint.ub) gk_col_ub.append([-lb for lb in self.stage_constraint.lb]) else: residual = self._stage_constraints_fun(x_col, self._w, problem['p']) gk_col.append(residual) gk_col_lb.append(self.stage_constraint.lb) gk_col_ub.append(self.stage_constraint.ub) gk_col.append(problem['collocation_equations']) gk_col_lb.append(np.zeros(problem['collocation_equations'].shape[0])) gk_col_ub.append(np.zeros(problem['collocation_equations'].shape[0])) # Create function int_dynamics_fun = ca.Function('integrator_collocation', [problem['t'], # time variable (for time varying systems) problem['dt'], # dt variable (for possibly different sampling time) ca.vertcat(*problem['collocation_points_ode']), # x at collocation points problem['x'], # x at the beginning of the interval problem['u'], # input of the interval ca.vertcat(*problem['collocation_points_alg']), # alg states at coll. problem['p'], # parameters (constant over the interval at least) ek, # slack variable for (possibly) soft constrained systems # self._w, t_ref_placeholder], [ca.vertcat(*gk_col), problem['ode']]) elif self._nlp_options['integration_method'] == 'discrete': int_dynamics_fun = ca.Function('integrator_discrete', [problem['t'], problem['dt'], problem['x'], problem['u'], problem['p'], ], [problem['ode']]) elif self._nlp_options['integration_method'] == 'multiple_shooting': x = problem['x'] u = problem['u'] p = problem['p'] z = problem['z'] if model.n_z == 0: dae = {'x': x, 'p': ca.vertcat(u, p, t_ref_placeholder), 'ode': model.ode} else: dae = {'x': ca.vertcat(x, z), 'p': ca.vertcat(u, p, t_ref_placeholder), 'ode': ca.vertcat(model.ode, model.alg)} opts = {'abstol': 1e-10, 'reltol': 1e-10} int_dynamics_fun = ca.integrator("integrator_ms", model.solver, dae, 0, self._sampling_interval, opts) else: raise ValueError(f"Integration {self._nlp_options['integration_method']} not defined.") # Total number of optimization variable n_v = model.n_x * (self._horizon + 1) + self._model._n_p if self._state_noise_flag: n_v += self._horizon * model.n_x if self._nlp_options['integration_method'] == 'collocation': n_v += self._horizon * (n_xik + n_zik) if self.stage_constraint.is_soft: n_v += self.stage_constraint.constraint.size1() v = ca.MX.sym('v', n_v) # All variables with bounds and initial guess v_lb = np.zeros(n_v) v_ub = np.zeros(n_v) v_guess = np.zeros(n_v) offset = 0 # Get parameters if self._model.n_p > 0: p_ind = self._p_ind p = v[offset:offset + self._model.n_p] p_ind.append([j for j in range(offset, offset + self._model.n_p)]) v_lb[offset:offset + self._model.n_p] = self._p_lb v_ub[offset:offset + self._model.n_p] = self._p_ub v_guess[offset:offset + self._model.n_p] = self._p_guess offset += self._model.n_p else: p = [] # Predefine optimization variable x = np.resize(np.array([], dtype=ca.MX), (self._horizon + 1, 1)) x_ind = self._x_ind for ii in range(self._horizon + 1): x[ii, 0] = v[offset:offset + model.n_x] x_ind.append([j for j in range(offset, offset + model.n_x)]) v_guess[offset:offset + model.n_x] = self._x_guess v_lb[offset:offset + model.n_x] = self._x_lb v_ub[offset:offset + model.n_x] = self._x_ub offset += model.n_x # Predefine optimization variable if self._state_noise_flag: w = np.resize(np.array([], dtype=ca.MX), (self._horizon, 1)) w_ind = self._w_ind for ii in range(self._horizon): w[ii, 0] = v[offset:offset + model.n_x] w_ind.append([j for j in range(offset, offset + model.n_x)]) v_guess[offset:offset + model.n_x] = self._w_guess v_lb[offset:offset + model.n_x] = self._w_lb v_ub[offset:offset + model.n_x] = self._w_ub offset += model.n_x if self._nlp_options['integration_method'] == 'collocation': ip = np.resize(np.array([], dtype=ca.MX), (self._prediction_horizon, 1)) zp = np.resize(np.array([], dtype=ca.MX), (self._prediction_horizon, 1)) for ii in range(self._prediction_horizon): ip[ii, 0] = v[offset:offset + n_xik] v_guess[offset:offset + n_xik] = x_ik_guess v_lb[offset:offset + n_xik] = x_ik_lb v_ub[offset:offset + n_xik] = x_ik_ub offset += n_xik if model.n_z > 0: zp[ii, 0] = v[offset:offset + n_zik] v_guess[offset:offset + n_zik] = z_ik_guess v_lb[offset:offset + n_zik] = z_ik_lb v_ub[offset:offset + n_zik] = z_ik_ub offset += n_zik if self.stage_constraint.is_soft: e_soft_stage = v[offset:offset + self.stage_constraint.size] self._e_soft_stage_ind = [j for j in range(offset, offset + self.stage_constraint.size)] v_lb[offset:offset + self.stage_constraint.size] = np.zeros(self.stage_constraint.size) v_ub[offset:offset + self.stage_constraint.size] = self.stage_constraint.max_violation v_guess[offset:offset + self.stage_constraint.size] = np.zeros(self.stage_constraint.size) offset += self.stage_constraint.size else: e_soft_stage = ca.MX.sym('e_soft_stage', 0) # Those are parameters that need to be passed to the solver before running the optimization ext_parameters = catools.struct_symMX([catools.entry('tv_p', shape=(self._n_tvp, self._horizon)), catools.entry('p_arrival', shape=p_arrival.shape[0]), catools.entry('x_arrival', shape=x_arrival.shape[0]), catools.entry('u_meas', shape=(self._model.n_u, self._horizon)), catools.entry('y_meas', shape=( self.quad_stage_cost._n_tv_refs, self._horizon)), catools.entry('dt', shape=self._horizon), catools.entry('time', shape=1)]) u_meas = ext_parameters['u_meas'] y_meas = ext_parameters['y_meas'] p_arrival = ext_parameters['p_arrival'] x_arrival = ext_parameters['x_arrival'] _dt = ext_parameters['dt'] # Constraint function for the NLP g = [] g_lb = [] g_ub = [] J = 0 # current time time_now = ext_parameters['time'] # Time at the beginning of the horizon (in the past) time = time_now - ca.sum1(_dt) for ii in range(self._horizon): x_ii = x[ii, 0] if self._state_noise_flag: w_ii = w[ii, 0] else: w_ii = [] u_ii = u_meas[:, ii] y_ii = y_meas[:, ii] dt_ii = _dt[ii] if self._nlp_options['integration_method'] == 'multiple_shooting': # TODO: here somewhere t has to enter sol = int_dynamics_fun(x0=x_ii, p=ca.vertcat(u_ii, p, y_ii)) # add state noise. I assume the state noise is in discrete form so I need to add it here if self._state_noise_flag: x_ii_1 = sol['xf'] + w_ii elif self._nlp_options['integration_method'] == 'collocation': [g_coll, x_ii_1] = int_dynamics_fun( time + dt_ii, dt_ii, ip[ii, 0], x_ii, u_ii, zp[ii, 0], p, e_soft_stage, y_ii) g.append(g_coll) g_lb.extend(gk_col_lb) g_ub.extend(gk_col_ub) # add state noise. I assume the state noise is in discrete form so I need to add it here if self._state_noise_flag: x_ii_1 = x_ii_1 + w_ii elif self._nlp_options['integration_method'] == 'discrete': x_ii_1 = int_dynamics_fun(time + dt_ii, dt_ii, x_ii, u_ii, p) if self._state_noise_flag: x_ii_1 = x_ii_1 + w_ii g.append(x[ii + 1, 0] - x_ii_1) g_lb.append(np.zeros(model.n_x)) g_ub.append(np.zeros(model.n_x)) # Add lagrange term if ii == 0: if self._arrival_term_flag: J += self._arrival_term_fun(x_ii, p, x_arrival, p_arrival) else: if self._stage_term_flag: J += self._stage_term_fun(w_ii, x_ii, u_ii, y_ii) if self.stage_constraint.is_set: if self.stage_constraint.is_soft: residual = self._stage_constraints_fun(x_ii, w_ii, p, e_soft_stage) J += self.stage_constraint.cost(e_soft_stage) g.append(residual) g_lb.append([-ca.inf] * self.stage_constraint.size * 2) g_ub.append(self.stage_constraint.ub) g_ub.append([-lb for lb in self.stage_constraint.lb]) else: residual = self._stage_constraints_fun(x_ii, w_ii, p) g.append(residual) g_lb.append(self.stage_constraint.lb) g_ub.append(self.stage_constraint.ub) # update time in the horizon time += dt_ii if self._custom_constraints_flag: raise NotImplementedError('Custom constraints are not yet implemented') if self._stage_constraints_flag: raise NotImplementedError('Auxiliary constraints are not yet implemented') g = ca.vertcat(*g) self._g_lb = ca.DM(ca.vertcat(*g_lb)) self._g_ub = ca.DM(ca.vertcat(*g_ub)) self._v0 = ca.DM(v_guess) self._v_lb = ca.DM(v_lb) self._v_ub = ca.DM(v_ub) self._J = J self._v = v self._param_npl_mhe = ext_parameters self._g = g self._nlp_setup_done = True self._n_v = n_v self._ext_parameters = ext_parameters nlp_dict = {'f': self._J, 'x': self._v, 'p': self._param_npl_mhe, 'g': self._g} if self._solver_name == 'ipopt': solver = ca.nlpsol("solver", 'ipopt', nlp_dict, self._nlp_opts) elif self._solver_name == 'qpsol': solver = ca.qpsol("solver", 'qpoases', nlp_dict, self._nlp_opts) else: raise ValueError(f"The solver {self._solver_name} does no exist. The possible solver are", self._solver_name_list) self._solver = solver
[docs] def set_nlp_options(self, *args, **kwargs): """ Sets the options that modify how the mhe problem is set :param args: :param kwargs: :return: """ # TODO: when multiple-shooting and irk are implemented/tested add them to the list possible_choices = {} possible_choices['integration_method'] = ['collocation', 'rk4', 'erk', 'discrete', 'multiple_shooting'] # 'irk' possible_choices['collocation_points'] = ['radau', 'legendre'] possible_choices['degree'] = None possible_choices['print_level'] = [0, 1] possible_choices['arrival_guess_update'] = ['filtering', 'smoothing'] option_list = list(possible_choices.keys()) default_opts = { 'integration_method': 'collocation', 'collocation_points': 'radau', 'degree': 3, 'print_level': 1, 'arrival_guess_update': 'smoothing' } opts = {} if len(args) != 0: if isinstance(args[0], dict): opts = args[0] else: if len(kwargs) != 0: opts = kwargs for key, value in opts.items(): if key not in option_list: raise ValueError(f"The option named {key} does not exist. Possible options are {option_list}.") if possible_choices[key] is not None and value not in possible_choices[key]: raise ValueError( f"The option {key} is set to value {value} put the only allowed values are {possible_choices[key]}." ) else: default_opts[key] = value if default_opts.get('integration_method') != 'discrete' and self._model.discrete is True: warnings.warn( f"The integration method is set to {default_opts.get('integration_method')} but I notice that the model" f" is in discrete time. I am overwriting and using discrete mode." ) default_opts['integration_method'] = 'discrete' # Integration methods. Those are necessary for the RungeKutta class if default_opts['integration_method'] == 'rk4': default_opts['class'] = 'explicit' default_opts['method'] = 'rk' default_opts['order'] = 4 default_opts['category'] = 'runge-kutta' elif default_opts['integration_method'] == 'erk': default_opts['class'] = 'explicit' default_opts['method'] = 'rk' default_opts['category'] = 'runge-kutta' elif default_opts['integration_method'] == 'irk': default_opts['class'] = 'implicit' default_opts['method'] = 'rk' default_opts['category'] = 'runge-kutta' elif default_opts['integration_method'] == 'collocation': default_opts['class'] = 'implicit' default_opts['method'] = 'collocation' default_opts['category'] = 'runge-kutta' self._nlp_options_is_set = True self._nlp_options = default_opts
[docs] def set_scaling(self, x_scaling=None, w_scaling=None, p_scaling=None, u_scaling=None): """ Scales the states and input. This is important for systems with large difference of order of magnitude in states and inputs. :param x_scaling: list of scaling factors :param w_scaling: list of scaling factors :param p_scaling: list of scaling factors :param u_scaling: list of scaling factors :return: """ if x_scaling is None: self._x_scaling = self._model.n_x * [1] else: if isinstance(x_scaling, list) or isinstance(x_scaling, np.ndarray): self._x_scaling = x_scaling else: raise ValueError("Scaling factors x_scaling must be a list or nd.arrays") if w_scaling is None: self._w_scaling = self._model.n_x * [1] else: if isinstance(w_scaling, list) or isinstance(w_scaling, np.ndarray): self._w_scaling = w_scaling else: raise ValueError("Scaling factors u_scaling must be a list or nd.arrays") if p_scaling is None: self._p_scaling = self._model.n_p * [1] else: if isinstance(p_scaling, list) or isinstance(p_scaling, np.ndarray): self._p_scaling = p_scaling else: raise ValueError("Scaling factors x_scaling must be a list or nd.arrays") if u_scaling is None: self._u_scaling = self._model.n_u * [1] else: if isinstance(u_scaling, list) or isinstance(u_scaling, np.ndarray): self._u_scaling = u_scaling else: raise ValueError("Scaling factors u_scaling must be a list or nd.arrays") self._scaling_is_set = True
[docs] def set_time_varying_parameters(self, time_varying_parameters=None): """ Sets the time-varying parameters for the estimator. The estimator will expect these as an input at every iteration :param time_varying_parameters: list of strings with time varying parameter names :return: """ # TODO check if it is the same as the parent class, if yes delete if time_varying_parameters is None: self._time_varying_parameters = [] else: if not check_if_list_of_string(time_varying_parameters): raise ValueError("tvp must be a list of strings with the parameters name that are time varying") for tvp in time_varying_parameters: if tvp not in self._model.p: raise ValueError(f"The time-varying parameter {tvp} is not in the model0 parameter. " f"The model0 parameters are {self._model.parameter_names}.") self._time_varying_parameters = time_varying_parameters self._time_varying_parameters_is_set = True
[docs] def set_box_constraints(self, x_ub=None, x_lb=None, w_ub=None, w_lb=None, p_lb=None, p_ub=None, z_ub=None, z_lb=None): """Set box constraints on states, noise, parameters, and algebraic variables. :param x_ub: Upper bounds for states (default: inf) :param x_lb: Lower bounds for states (default: -inf) :param w_ub: Upper bounds for state noise (default: inf) :param w_lb: Lower bounds for state noise (default: -inf) :param p_lb: Lower bounds for parameters (default: -inf) :param p_ub: Upper bounds for parameters (default: inf) :param z_ub: Upper bounds for algebraic states (default: inf) :param z_lb: Lower bounds for algebraic states (default: -inf) :raises TypeError: If algebraic state bounds dimension mismatch """ if x_ub is not None: self._x_ub = check_and_wrap_to_list(x_ub) else: self._x_ub = self._model.n_x * [ca.inf] if x_lb is not None: self._x_lb = check_and_wrap_to_list(x_lb) else: self._x_lb = self._model.n_x * [-ca.inf] if w_ub is not None: self._w_ub = check_and_wrap_to_list(w_ub) else: self._w_ub = self._model.n_x * [ca.inf] if w_lb is not None: self._w_lb = check_and_wrap_to_list(w_lb) else: self._w_lb = self._model.n_x * [-ca.inf] if self._model._n_p > 0: if p_ub is not None: self._p_ub = check_and_wrap_to_list(p_ub) else: warnings.warn("Some parameters are uncertain but you did not give me the upper bounds. I am assuming " "there are infinite.") self._p_ub = self._model._n_p * [ca.inf] if p_lb is not None: self._p_lb = check_and_wrap_to_list(p_lb) else: warnings.warn( "Some parameters are uncertain but you did not give me the lower bounds. " "I am assuming there are -infinite." ) self._p_lb = self._model._n_p * [-ca.inf] # Algebraic constraints if z_ub is not None: z_ub = check_and_wrap_to_list(z_ub) if len(z_ub) != self._model.n_z : raise TypeError(f"The model has {self._n_z} algebraic states. " f"You need to pass the same number of bounds.") self._z_ub = z_ub else: self._z_ub = self._model.n_z * [ca.inf] if z_lb is not None: z_lb = check_and_wrap_to_list(z_lb) if len(z_lb) != self._model.n_z : raise TypeError(f"The model has {self._n_z} algebraic states. " f"You need to pass the same number of bounds.") self._z_lb = z_lb else: self._z_lb = self._model.n_z * [-ca.inf] self._box_constraints_is_set = True
[docs] def set_initial_guess(self, x_guess=None, w_guess=None, p_guess=None, z_guess=None): """Set initial guess for optimization variables. If not provided, guesses are computed from constraint bounds (midpoint) or set to zero. :param x_guess: Initial guess for states :param w_guess: Initial guess for state noise (default: zeros) :param p_guess: Initial guess for parameters :param z_guess: Initial guess for algebraic states :raises ValueError: If guess dimensions don't match model """ if x_guess is not None: self._x_guess = check_and_wrap_to_list(x_guess) if len(self._x_guess) != self._model.n_x: raise ValueError( f"x_guess dimension and model input dimension do not match. Model input has dimension " f"{len(self._x_guess)} while x_guess had dimension {self._model.n_x}" ) else: if ca.inf not in self._x_lb and ca.inf not in self._x_ub: self._x_guess = [(self._x_ub[i] - self._x_lb[i]) / 2 for i in range(len(self._x_ub))] else: self._x_guess = self._model.n_x * [0] if w_guess is not None: self._w_guess = check_and_wrap_to_list(w_guess) else: self._w_guess = self._model.n_x * [0] if len(self._w_guess) != self._model.n_x: raise ValueError( f"w_guess dimension and dimension of the model state do not match. " f"The state vector has dimension {self._model.n_x} while w_guess {len(self._w_guess)}" ) if self._model.n_p > 0: if p_guess is not None: self._p_guess = check_and_wrap_to_list(p_guess) else: if ca.inf not in self._p_lb and ca.inf not in self._p_ub: self._p_guess = [(self._p_ub[i] - self._p_lb[i]) / 2 for i in range(len(self._p_ub))] else: self._p_guess = self._model.n_p * [0] if len(self._p_guess) != self._model._n_p: raise ValueError( f"p_guess dimension and the dimension of the uncertain parameters do not match. " f"The uncertain parameters are {self._model._n_p} while p_guess is {len(self._p_guess)} long." ) if z_guess is not None: self._z_guess = check_and_wrap_to_list(deepcopy(z_guess)) if len(self._z_guess) != self._model.n_z: raise ValueError( f"z_guess dimension and model u dimension do not match. Model z has dimension {len(self._z_guess)} " f"while z_guess{self._model.n_z}" ) else: try: if ca.inf not in self._z_lb and ca.inf not in self._z_ub: self._z_guess = [(u - l) / 2 for u in self._z_ub for l in self._z_lb] except AttributeError: self._z_guess = self._model.n_z * [0] self._initial_guess_is_set = True
[docs] def set_aux_nonlinear_constraints(self, aux_nl_const=None, ub=None, lb=None): """Set auxiliary nonlinear constraints (deprecated, use stage_constraint instead). :param aux_nl_const: CasADi expression for constraint function :param ub: Upper bounds for constraints :param lb: Lower bounds for constraints :raises ValueError: If constraint function provided without bounds or vice versa """ if None not in [aux_nl_const, ub, lb]: self._stage_constraints = aux_nl_const self._stage_constraints_lb = lb self._stage_constraints_ub = ub self._stage_constraints_flag = True elif check_if_list_of_none([aux_nl_const, ub, lb]): pass else: raise ValueError("When passing nonlinear constraints, you must pass" "the nonlinear constraint function, lower and upper bound")
[docs] def plot_mhe_estimation(self, save_plot=False, plot_dir=None, name_file='mhe_estimation.html', show_plot=True, extras=None, extras_names=None, title=None, format_figure=None): """ :param save_plot: if True plot will be saved under 'plot_dir/name_file.html' if they are declared, otherwise in current directory :param plot_dir: path to the folder where plots are saved (default = None) :param name_file: name of the file where plot will be saved (default = mpc_prediction.html) :param show_plot: if True, shows plots (default = False) :param extras: dictionary with values that will be plotted over the predictions if keys are equal to predicted states/inputs :param extras_names: tags that will be attached to the extras in the legend :param title: title of the plots :param format_figure: python function that modifies the format of the figure :return: """ import os from bokeh.io import output_file, show, save from bokeh.plotting import figure from bokeh.models import ColumnDataSource, DataTable, TableColumn, CellFormatter, Div from bokeh.layouts import gridplot, row, column time = self.current_time - self.sampling_interval * self._horizon if self._nlp_solution is None: raise RuntimeError("You need to run the MPC at least once to see the plots") if save_plot: if plot_dir is not None: output_file(os.path.join(plot_dir, name_file)) else: output_file(name_file) if extras is None: extras = [] if extras_names is None: extras_names = [] if isinstance(extras, dict): extras = [extras] elif not isinstance(extras, list) and not isinstance(extras, dict): raise ValueError("The extras options should be a dictionary or a list of dictionaries") res_extras_list = len(extras) * [0] for i in range(len(extras)): res_extras_list[i] = {} for k, name in enumerate(self._model.dynamical_state_names): if name in extras[i].keys(): res_extras_list[i][name] = extras[i][name] if len(extras) != len(extras_names): raise ValueError("The length of the extra and extras_names must be the same.") # TODO: consider time step for the x axis x_pred, w_pred = self.return_mhe_estimation() noise_dict = {i: [] for i in self._model.dynamical_state_names} states_dict = {i: [] for i in self._model.dynamical_state_names} for k, name in enumerate(self._model.dynamical_state_names): noise_dict[name] = w_pred[k, :] for k, name in enumerate(self._model.dynamical_state_names): states_dict[name] = x_pred[k, :] p1 = [figure(title=title, background_fill_color="#fafafa") for i in range(self._model.n_x)] time_vector = np.linspace(time, time + self._horizon * self.sampling_interval, self._horizon + 1) for s, name in enumerate(self._model.dynamical_state_names): p1[s].line(x=time_vector, y=states_dict[name], legend_label=name + '_est', line_width=2) p1[s].yaxis.axis_label = name p1[s].xaxis.axis_label = 'time' if format_figure is not None: p1[s] = format_figure(p1[s]) if self._state_noise_flag: p2 = self._model.n_x * [figure(title=title, background_fill_color="#fafafa")] for s, name in enumerate(self._model.dynamical_state_names): p2[s].step(x=time_vector[:-1], y=noise_dict[name], legend_label=name + "_noise", mode='after', line_width=2) p2[s].yaxis.axis_label = name p2[s].xaxis.axis_label = "time" if format_figure is not None: p2[s] = format_figure(p2[s]) # Create some data to print statistics variables = [] values = [] heading = Div(text="MHE stats", height=80, sizing_mode='stretch_width', align='center', style={'font-size': '200%'}) # heading fills available width data = dict( variables=variables, values=values, ) source = ColumnDataSource(data) columns = [ TableColumn(field='variables', title="Variables"), TableColumn(field='values', title="Values", formatter=CellFormatter()), ] data_table = DataTable(source=source, columns=columns, width=400, height=280) grid_states = gridplot(p1, ncols=3, sizing_mode='stretch_width') states_header = Div(text="Predicted States", height=10, sizing_mode='stretch_width', align='center', style={'font-size': '200%'}) if self._state_noise_flag: grid_noise = gridplot(p2, ncols=3, sizing_mode='stretch_width') inputs_header = Div(text="Predicted Noise", height=10, sizing_mode='stretch_width', align='center', style={'font-size': '200%'}) layout = row(column(states_header, grid_states, inputs_header, grid_noise), column(heading, data_table)) else: layout = row(column(states_header, grid_states), column(heading, data_table)) if show_plot: show(layout, browser='google-chrome') else: if save_plot: save(layout)
[docs] def return_mhe_estimation(self): """Extract state trajectory and noise estimates from MHE solution. :return: Tuple (x_pred, w_pred) with state trajectory (n_x × horizon+1) and noise trajectory (n_x × horizon), or (None, None) if no solution is available """ if self._nlp_solution is not None: x_pred = np.zeros((self._model.n_x, self._horizon + 1)) w_pred = np.zeros((self._model.n_x, self._horizon)) for ii in range(self._horizon + 1): x_pred[:, ii] = np.asarray(self._nlp_solution['x'][self._x_ind[ii]]).squeeze() * self._x_scaling if self._state_noise_flag: for ii in range(self._horizon): w_pred[:, ii] = np.asarray(self._nlp_solution['x'][self._w_ind[ii]]).squeeze() * self._w_scaling else: w_pred = None return x_pred, w_pred else: warnings.warn("There is still no mpc solution available. Run mpc.optimize() to get one.") return None, None
@property def has_state_noise(self): """Check if state noise is included in the MHE formulation. :return: True if state noise variables are used, False otherwise """ return self._state_noise_flag @has_state_noise.setter def has_state_noise(self, arg): """Set whether to include state noise in the MHE formulation. :param arg: Boolean flag to enable/disable state noise estimation :raises TypeError: If arg is not a boolean """ if not isinstance(arg, bool): raise TypeError("has_state_noise accepts True or False") self._state_noise_flag = arg
__all__ = [ 'MovingHorizonEstimator' ]