#
# This file is part of HILO-MPC
#
# HILO-MPC is a toolbox for easy, flexible and fast development of machine-learning-supported
# optimal control and estimation problems
#
# Copyright (c) 2021 Johannes Pohlodek, Bruno Morabito, Rolf Findeisen
# All rights reserved
#
# HILO-MPC is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# HILO-MPC is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with HILO-MPC. If not, see <http://www.gnu.org/licenses/>.
#
from copy import deepcopy
import time
import warnings
import casadi as ca
import casadi.tools as catools
import numpy as np
from scipy.special import erfinv
import hilo_mpc
from .base import Controller
from ..dynamic_model.dynamic_model import Model
from ..optimizer import DynamicOptimization
from ...util.modeling import GenericCost, QuadraticCost, GenericConstraint, continuous2discrete
from ...util.optimizer import IpoptDebugger
from ...util.util import check_and_wrap_to_list, check_and_wrap_to_DM, scale_vector, clip
[docs]class NMPC(Controller, DynamicOptimization):
"""Class for Nonlinear Model Predictive Control"""
def __init__(self, model, id=None, name=None, plot_backend=None, use_sx=True, stats=False):
"""Constructor method"""
# TODO: when discrete_u or discrete_x is given to the opts structure, but NMPC is used, raise an error saying
# that MINMPC should be used
super().__init__(model, id=id, name=name, plot_backend=plot_backend, stats=stats, use_sx=use_sx)
self._may_term_flag = False
self._lag_term_flag = False
self._prediction_horizon_is_set = False
self._control_horizon_is_set = False
self._change_input_term_flag = False
self._mixed_integer_flag = False
self._trajectory_following_flag = False
self._mpc_options_is_set = False
# self._tau = ca.SX.sym('tau', 0)
self._paths_var_list = []
self._time = 0
# Initialize the costs
self.stage_cost = GenericCost(self._model, use_sx=use_sx)
self.terminal_cost = GenericCost(self._model, use_sx=use_sx)
self.quad_stage_cost = QuadraticCost(self._model, use_sx=use_sx)
self.quad_terminal_cost = QuadraticCost(self._model, use_sx=use_sx)
self._time_var = []
# Initialize generic stage and terminal constraints
self.stage_constraint = GenericConstraint(self._model)
self.terminal_constraint = GenericConstraint(self._model)
self._lag_term = 0
self._may_term = 0
self._minimize_final_time_flag = False
self._time_varying_parameters_ind = []
self._x_ub = []
self._x_lb = []
self._u_ub = []
self._u_lb = []
self.time_varying_parameters = []
self._x_scaling = []
self._u_scaling = []
self._y_scaling = []
# self._sampling_interval = []
self._prediction_horizon = []
self._control_horizon = []
self._e_soft_stage_ind = []
self._e_term_stage_ind = []
# In some cases, for example path following with interpolated path using spline, the objective
# function needs to be defined in MX variable instead of SX, because the CasADi spline interpolation
# is defined only in MX functions.
self._use_sx = use_sx
# Save dimensions of the model. This can be useful because the MPC can change these later, but we still need
# them
self._n_x = deepcopy(model.n_x)
self._n_u = deepcopy(model.n_u)
self._n_y = deepcopy(model.n_y)
self._n_z = deepcopy(model.n_z)
self._n_p = deepcopy(model.n_p)
def __str__(self):
"""String representation method"""
if not self._nlp_setup_done:
raise ValueError(f'Howdy! You need to setup {self.__class__.__name__} before printing. '
f'Run {self.__class__.__name__}.setup().')
from prettytable import PrettyTable
import textwrap
x_box_const = PrettyTable()
x_box_const.field_names = ["State", "LB", "UB"]
for k, name in enumerate(self._model.dynamical_state_names):
x_box_const.add_row([name, self._x_lb[k], self._x_ub[k]])
u_box_const = PrettyTable()
u_box_const.field_names = ["Input", "LB", "UB"]
for k, name in enumerate(self._model.input_names):
u_box_const.add_row([name, self._u_lb[k], self._u_ub[k]])
args = "=================================================================\n"
args += " Nonlinear Model Predictive Control \n"
if self._id is not None:
args += f"id='{self._id}'"
if self.name is not None:
args += f", name='{self.name}'"
args += "-----------------------------------------------------------------\n"
args += f"Prediction horizon: {self.prediction_horizon} \n"
args += f"Control horizon: {self.control_horizon} \n"
args += f"Model name: {self._model.name} \n"
args += f"Sampling interval: {self.sampling_interval}\n"
if self.quad_stage_cost.n_of_paths > 0 or self.quad_terminal_cost.n_of_paths > 0:
args += f"Path following problem\n"
if self.quad_stage_cost.n_of_trajectories > 0 or self.quad_terminal_cost.n_of_trajectories > 0:
args += f"Trajectory tracking problem\n"
args += "\n-----------------------------------------------------------------\n"
args += "Objective function"
args += "\n-----------------------------------------------------------------\n"
args += "Lagrange (stage) cost:\n"
args += textwrap.fill(f"{self._lag_term}", width=80, subsequent_indent='\t\t...')
args += "\n"
args += "Mayor (terminal) cost:\n"
args += textwrap.fill(f"{self._may_term}", width=80, subsequent_indent='\t\t...')
args += "\n-----------------------------------------------------------------\n"
args += "Box constraints"
args += "\n-----------------------------------------------------------------\n"
args += "States \n"
args += x_box_const.get_string()
args += "\n"
args += "Inputs \n"
args += u_box_const.get_string()
if self.quad_terminal_cost._has_trajectory_following:
tf_table = PrettyTable()
tf_table.field_names = ["Variable", "Trajectory"]
args += "\t Trajectory following active:"
for element in self.quad_stage_cost._trajectories_list:
for k, name in enumerate(element['names']):
tf_table.add_row([name, element['ref'][k]])
args += tf_table.get_string()
args += "\n"
args += "\n-----------------------------------------------------------------\n"
args += "Initial guesses"
args += "\n-----------------------------------------------------------------\n"
x_guess_table = PrettyTable()
x_guess_table.field_names = ["State", "Guess"]
for k, name in enumerate(self._model.dynamical_state_names):
x_guess_table.add_row([name, self._x_guess[k]])
args += "States \n"
args += x_guess_table.get_string()
args += "\n"
u_guess_table = PrettyTable()
u_guess_table.field_names = ["Input", "Guess"]
for k, name in enumerate(self._model.input_names):
u_guess_table.add_row([name, self._u_guess[k]])
args += "Inputs \n"
args += u_guess_table.get_string()
args += "\n"
args += "\n-----------------------------------------------------------------\n"
args += "Numerical solution and optimization stats"
args += "\n-----------------------------------------------------------------\n"
args += f"Integration: {self._nlp_options['integration_method']} method. \n"
args += f"NLP solver: {self._solver_name}. \n"
args += f"Number of optimization variables: {self._n_v}. \n"
args += "\n=================================================================\n"
return args
def _update_type(self) -> None:
"""
:return:
"""
self._type = 'NMPC'
def _define_cost_terms(self):
"""
Creates the SX expressions for the stage or terminal cost
:return:
"""
if self.stage_cost._is_set:
self._lag_term += self.stage_cost.cost
self._lag_term_flag = True
if self.terminal_cost._is_set:
self._may_term += self.terminal_cost.cost
self._may_term_flag = True
self.quad_stage_cost._setup(x_scale=self._x_scaling, u_scale=self._u_scaling, y_scale=self._y_scaling,
time_variable=self.time_var, path_variables=self._paths_var_list)
if self.quad_stage_cost._is_set:
self._lag_term += self.quad_stage_cost.cost
self._lag_term_flag = True
self.quad_terminal_cost._setup(x_scale=self._x_scaling, u_scale=self._u_scaling, y_scale=self._y_scaling,
time_variable=self.time_var, path_variables=self._paths_var_list)
if self.quad_terminal_cost._is_set:
self._may_term += self.quad_terminal_cost.cost
self._may_term_flag = True
if self.quad_stage_cost._has_trajectory_following:
for i in range(self.quad_stage_cost.n_of_trajectories):
fun_sx = self.quad_stage_cost._trajectories_list[i]['ref']
if fun_sx is not None:
self._lag_term = ca.substitute(self._lag_term,
self.quad_stage_cost._trajectories_list[i]['placeholder'],
self.quad_stage_cost._trajectories_list[i]['ref'])
if self.quad_terminal_cost._has_trajectory_following:
for i in range(self.quad_terminal_cost.n_of_trajectories):
fun_sx = self.quad_terminal_cost._trajectories_list[i]['ref']
if fun_sx is not None:
self._may_term = ca.substitute(self._may_term,
self.quad_terminal_cost._trajectories_list[i]['placeholder'],
self.quad_terminal_cost._trajectories_list[i]['ref'])
def _scale_problem(self):
"""
:return:
"""
self._u_ub = scale_vector(self._u_ub, self._u_scaling)
self._u_lb = scale_vector(self._u_lb, self._u_scaling)
self._u_guess = scale_vector(self._u_guess, self._u_scaling)
self._x_ub = scale_vector(self._x_ub, self._x_scaling)
self._x_lb = scale_vector(self._x_lb, self._x_scaling)
self._x_guess = scale_vector(self._x_guess, self._x_scaling)
# ... ode ...
self._model.scale(self._u_scaling, id='u')
self._model.scale(self._x_scaling, id='x')
def _check_mpc_is_well_posed(self):
"""
:return:
"""
if self._lag_term_flag is False and self._may_term_flag is False and self._minimize_final_time_flag is False:
raise ValueError("You need to define a cost function before setting up the mpc.")
if not self._prediction_horizon_is_set:
raise ValueError("You must set a prediction horizon length before")
if not self._control_horizon_is_set:
raise ValueError("You must set a control horizon length before.")
# Check tvp and initialize horizon of tvp values
if self._time_varying_parameters_values is not None:
self._time_varying_parameters_horizon = ca.DM.zeros((self._n_tvp), self.prediction_horizon)
tvp_counter = 0
for key, value in self._time_varying_parameters_values.items():
if len(value) < self.prediction_horizon:
raise TypeError(
f"When passing time-varying parameters, you need to pass a number of values at least "
f"as long as the prediction horizon. The parameter {key} has {len(value)} values but the MPC "
f"has a prediction horizon length of {self._prediction_horizon}."
)
value = self._time_varying_parameters_values[key]
self._time_varying_parameters_horizon[tvp_counter, :] = value[0:self._prediction_horizon]
tvp_counter += 1
def _get_tvp_parameters_values(self, tvp):
"""
Given the current iteration, returns the value for every sampling time of all time-varying parameter
:return:
"""
ci = self._n_iterations
# Shift the horizon of tvp one step back
if ci > 0:
self._time_varying_parameters_horizon[:, 0:-1] = self._time_varying_parameters_horizon[:, 1:]
for k, name in enumerate(self._time_varying_parameters):
value = self._time_varying_parameters_values[name]
if ci + self.prediction_horizon > len(value):
warnings.warn("The prediction horizon is predicting outside the values of the time varying "
"parameters. I am now taking looping back the values and start from there. "
"See documentation for info.")
n_of_overtakes = int(np.floor(ci / self.prediction_horizon))
self._time_varying_parameters_horizon[k, -1] = value[
ci - self.prediction_horizon * n_of_overtakes]
else:
self._time_varying_parameters_horizon[k, -1] = value[ci + self.prediction_horizon - 1]
else:
self._time_varying_parameters_horizon = ca.DM.zeros((self._n_tvp), self.prediction_horizon)
tvp_counter = 0
for key, value in tvp.items():
if len(value) < self.prediction_horizon:
raise TypeError(
f"When passing time-varying parameters, you need to pass a number of values at least "
f"as long as the prediction horizon. The parameter {key} has {len(value)} values but the MPC "
f"has a prediction horizon length of {self._prediction_horizon}."
)
value = tvp[key]
self._time_varying_parameters_horizon[tvp_counter, :] = value[0:self._prediction_horizon]
tvp_counter += 1
def _parse_tvp_parameters_values(self, tvp):
"""
:param tvp:
:return:
"""
# Get horizon of tvp values
if tvp is not None:
self._time_varying_parameters_horizon = ca.DM.zeros((self._n_tvp), self.prediction_horizon)
tvp_counter = 0
for key, value in tvp.items():
if len(value) < self.prediction_horizon:
raise TypeError(
f"When passing time-varying parameters, you need to pass a number of values at least "
f"as long as the prediction horizon. The parameter {key} has {len(value)} values but the MPC "
f"has a prediction horizon length of {self._prediction_horizon}."
)
value = tvp[key]
self._time_varying_parameters_horizon[tvp_counter, :] = value[0:self._prediction_horizon]
tvp_counter += 1
elif self._time_varying_parameters_values is not None:
self._get_tvp_parameters_values(self._time_varying_parameters_values)
else:
raise ValueError(
f"Mate, I know there are {self._n_tvp} time varying parameters but you did not pass me any."
f"Please provide me with the values of the parameters, either to the optimize() method or to the "
f"set_time_varying_parameters() method."
)
def _parse_trajectory_values(self, param, **kwargs):
"""
:param param:
:param kwargs:
:return:
"""
if self.quad_stage_cost._has_trajectory_following:
# See if references have been passed to the optimize method
ref_sc = kwargs.get('ref_sc')
if ref_sc is not None:
if not isinstance(ref_sc, dict):
raise TypeError(
f"The trajectory must be a dict with as key the name of the variables that have a trajectory.")
for key in ref_sc.keys():
if key not in self.quad_stage_cost.name_open_varying_trajectories:
raise ValueError(
f"I cannot find the variable {key} in the variables with varying trajectory or the "
f"trajectory has already been provided as a function. The trajectories without reference "
f"are {', '.join(self.quad_stage_cost.name_open_varying_trajectories)}"
)
for traj in self.quad_stage_cost._trajectories_list:
for name in traj['names']:
traj_i = ref_sc[name]
traj_i = check_and_wrap_to_list(traj_i)
if len(traj_i) == 1:
# The reference is not time varying
traj_i = traj_i * self.prediction_horizon
param[name + '_sr'] = traj_i
traj['ref'] = traj_i
else:
if self.n_iterations + self.prediction_horizon >= len(traj_i):
# The reference is time varying
raise ValueError(
f"The length of the varying reference must be one or longer than than the "
f"simulation time plus the prediction horizon. Please supply data points. The "
f"trajectory is long {len(traj_i)} but I am predicting at least up to the "
f"{self.n_iterations + self.prediction_horizon + 1} step."
)
if self._nlp_options['objective_function'] != 'discrete':
raise AssertionError(
"Since you are passing time series as reference points, the objective function has"
"to be set to discrete. Pass `options={'objective_function': 'discrete'}` to the "
"NMPC setup."
)
traj_i = traj_i[self.n_iterations:self.n_iterations + self.prediction_horizon]
param[name + '_sr'] = traj_i
traj['ref'] = traj_i
else:
for traj in self.quad_stage_cost._trajectories_list:
if traj['ref'] is None:
raise ValueError(
f"Mate, it looks like the variable(s) {traj['names']} must follow a reference, but"
f"you did not pass any. Please pass a reference as a function in the stage cost "
f"or as values in the setup of the MPC."
)
if self.quad_terminal_cost._has_trajectory_following:
ref_tc = kwargs.get('ref_tc')
if ref_tc is not None:
if not isinstance(ref_tc, dict):
raise TypeError(
f"The trajectory must be a dict with as key the name of the variables that have a trajectory.")
for key in ref_tc.keys():
if key not in self.quad_terminal_cost.name_open_varying_trajectories:
raise ValueError(
f"I cannot find the variable {key} in the variables with varying trajectory or the "
f"trajectory has already been provided as a function. The trajectories without reference "
f"are {', '.join(self.quad_terminal_cost.name_open_varying_trajectories)}"
)
for traj in self.quad_terminal_cost._trajectories_list:
for name in traj['names']:
traj_i = ref_tc[name]
traj_i = check_and_wrap_to_list(traj_i)
if len(traj_i) == 1:
# The reference is not time varying
param[name + '_tr'] = traj_i
traj['ref'] = traj_i
else:
if self.n_iterations + self.prediction_horizon >= len(traj_i):
# The reference is time varying
raise ValueError(
f"The length of the verying reference must be one or longer than than the "
f"simulation time plus the prediction horizon. Please supply datapoints. The "
f"trajectory is long {len(traj_i)} but I am predicting at least up to the "
f"{self.n_iterations + self.prediction_horizon + 1} step."
)
if self._nlp_options['objective_function'] != 'discrete':
raise AssertionError(
"Since you are passing time series of reference points, the objective function has "
"to be set to discrete. Check the documentation to see how to set the objective "
"function to discrete."
)
traj_i = traj_i[self.n_iterations + self.prediction_horizon]
param[name + '_tr'] = traj_i
traj['ref'] = traj_i
else:
for traj in self.quad_terminal_cost._trajectories_list:
if traj['ref'] is None:
raise ValueError(
f"Mate, it looks like the variable(s) {traj['names']} must follow a reference, but"
f"you did not pass any. Please pass a reference as a function in the terminal "
f"cost or as values in the setup of the MPC."
)
def _get_nlp_parameters(self, cp, tvp, **kwargs):
"""
This arranges parameters, time-varying parameters and time-varying references in the parameters structure that
goes in the optimization.
:param cp:
:param tvp:
:param kwargs:
:return:
"""
param = self._param_npl_mpc(0)
# Substitute value to the parameters of the nonlinear program
# Input change term
if len(self.quad_stage_cost._ind_input_changes) > 0:
if self._nlp_solution is not None:
param['u_old'] = [self._nlp_solution['x'][self._u_ind[0]][i] for i in
self.quad_stage_cost._ind_input_changes]
else:
param['u_old'] = [self._u_guess[i] for i in self.quad_stage_cost._ind_input_changes]
# Constant parameters term
if cp is not None:
param['c_p'] = cp
# Time-varying parameters term
if self._n_tvp != 0:
self._parse_tvp_parameters_values(tvp)
param['tv_p'] = self._time_varying_parameters_horizon
# Reference for trajectory trackin problems term
self._parse_trajectory_values(param, **kwargs)
# TODO when varying sampling times are implemented, give the possibility to provide varying sampling times.
# Sampling intervals, for unequal sampling times
dt_grid = None
if dt_grid is None:
param['dt'] = ca.repmat(self.sampling_interval, self.prediction_horizon)
param['time'] = self._time
return param
def _save_references(self, param):
"""
Saves references into the solution object. For plotting purposes
:return:
"""
# Save current reference. Used for plotting purposes
x_ref = ca.DM.nan(self._model_orig.n_x, self.prediction_horizon)
u_ref = ca.DM.nan(self._model_orig.n_u, self.control_horizon)
if self.quad_stage_cost._has_trajectory_following:
for i, name in enumerate(self._model_orig.dynamical_state_names):
if f'{name}_sr' in param.keys():
x_ref[i, :] = param[f'{name}_sr']
for i, name in enumerate(self._model_orig.input_names):
if f'{name}_sr' in param.keys():
u_ref[i, :] = param[f'{name}_sr']
for traj in self.quad_stage_cost._trajectories_list:
if traj['traj_fun'] is not None:
if traj['type'] == 'states':
x_ref[traj['ind'], :] = traj['traj_fun'](self.solution['t'][0:self.prediction_horizon])
elif traj['type'] == 'inputs':
u_ref[traj['ind'], :] = traj['traj_fun'](self.solution['t'][0:self.control_horizon])
for ref in self.quad_stage_cost._references_list:
if ref['type'] == 'states':
x_ref[ref['ind'], :] = ca.repmat(ca.DM(ref['ref']).T, self.prediction_horizon).T
elif ref['type'] == 'inputs':
u_ref[ref['ind'], :] = ca.repmat(ca.DM(ref['ref']).T, self.control_horizon).T
for path in self.quad_stage_cost._paths_list:
if path['type'] == 'states':
x_ref[path['ind'], :] = path['path_fun'](self.solution['thetapfo'][:, :self.prediction_horizon])
elif path['type'] == 'inputs':
u_ref[path['ind'], :] = path['path_fun'](self.solution['thetapfo'][:, :self.control_horizon])
self.solution.add('u_ref', u_ref)
self.solution.add('x_ref', x_ref)
def _save_predictions(self, cp, tvp):
"""
Store prediction in the solution object. Necessary for plotting.
:return:
"""
# Save prediction in solution
x_pred, u_pred, dt_pred = self.return_prediction()
if self._n_tvp > 0:
p = self._rearrange_parameters_numeric(self._time_varying_parameters_horizon[:, 0], cp)
for ii in range(1, self._prediction_horizon):
p = ca.horzcat(p, self._rearrange_parameters_numeric(self._time_varying_parameters_horizon[:, ii], cp))
else:
p = cp
# Save solution in the solution class
self.solution.add('x', x_pred[:self._n_x, :])
self.solution.add('u', u_pred[:self._n_u, :])
self.solution.add('p', p)
self.solution.add('thetapfo', x_pred[self._n_x:self._n_x + self.n_of_path_vars, :])
if dt_pred is not None:
dt_sum = 0
for i in range(self.prediction_horizon):
self.solution.add('t', self._time + dt_sum)
dt_sum += dt_pred[i]
else:
t_pred = np.linspace(self._time, self._time + (self.prediction_horizon) * self.sampling_interval,
self.prediction_horizon + 1)
self.solution.add('t', ca.DM(t_pred).T)
@property
def x_lb(self):
"""
:return:
"""
return self._x_lb
@property
def x_ub(self):
"""
:return:
"""
return self._x_ub
@property
def u_lb(self):
"""
:return:
"""
return self._u_lb
@property
def u_ub(self):
"""
:return:
"""
return self._u_ub
[docs] def set_box_constraints(self, x_ub=None, x_lb=None, u_ub=None, u_lb=None, y_ub=None, y_lb=None, z_ub=None,
z_lb=None):
"""
Set box constraints to the model's variables. These look like
.. math::
x_{lb} \leq x \leq x_{ub}
:param x_ub: upper bound on states.
:type x_ub: list, numpy array or CasADi DM array
:param x_lb: lower bound on states
:type x_lb: list, numpy array or CasADi DM array
:param u_ub: upper bound on inputs
:type u_ub: list, numpy array or CasADi DM array
:param u_lb: lower bound on inputs
:type u_lb: list, numpy array or CasADi DM array
:param y_ub: upper bound on measurements
:type y_ub: list, numpy array or CasADi DM array
:param y_lb: lower bound on measurements
:type y_lb: list, numpy array or CasADi DM array
:param z_ub: upper bound on algebraic states
:type z_ub: list, numpy array or CasADi DM array
:param z_lb: lower bound on algebraic states
:type z_lb: list, numpy array or CasADi DM array
:return:
"""
if x_ub is not None:
x_ub = deepcopy(x_ub)
x_ub = check_and_wrap_to_list(x_ub)
if len(x_ub) != self._n_x:
raise TypeError(f"The model has {self._n_x} states. You need to pass the same number of bounds.")
self._x_ub = x_ub
else:
self._x_ub = self._model.n_x * [ca.inf]
if x_lb is not None:
x_lb = deepcopy(x_lb)
x_lb = check_and_wrap_to_list(x_lb)
if len(x_lb) != self._n_x:
raise TypeError(f"The model has {self._n_x} states. You need to pass the same number of bounds.")
self._x_lb = x_lb
else:
self._x_lb = self._model.n_x * [-ca.inf]
# Input constraints
if u_ub is not None:
u_ub = deepcopy(u_ub)
u_ub = check_and_wrap_to_list(u_ub)
if len(u_ub) != self._n_u:
raise TypeError(f"The model has {self._n_u} inputs. You need to pass the same number of bounds.")
self._u_ub = u_ub
else:
self._u_ub = self._model.n_u * [ca.inf]
if u_lb is not None:
u_lb = deepcopy(u_lb)
u_lb = check_and_wrap_to_list(u_lb)
if len(u_lb) != self._n_u:
raise TypeError(f"The model has {self._n_u} inputs. You need to pass the same number of bounds.")
self._u_lb = u_lb
else:
self._u_lb = self._model.n_u * [-ca.inf]
# Algebraic constraints
if z_ub is not None:
z_ub = deepcopy(z_ub)
z_ub = check_and_wrap_to_list(z_ub)
if len(z_ub) != self._n_z:
raise TypeError(f"The model has {self._n_z} algebraic states. You need to pass the same number of "
f"bounds.")
self._z_ub = z_ub
else:
self._z_ub = self._model.n_z * [ca.inf]
if z_lb is not None:
z_lb = deepcopy(z_lb)
z_lb = check_and_wrap_to_list(z_lb)
if len(z_lb) != self._n_z:
raise TypeError(f"The model has {self._n_z} algebraic states. You need to pass the same number of "
f"bounds.")
self._z_lb = z_lb
else:
self._z_lb = self._model.n_z * [-ca.inf]
if y_lb is not None or y_ub is not None:
# Measurement box constraints can be added by an extra stage and terminal constraint (possibly nonlinear)
self.set_stage_constraints(stage_constraint=self._model.meas, ub=deepcopy(y_ub), lb=deepcopy(y_lb),
name='measurement_constraint')
self.set_terminal_constraints(terminal_constraint=self._model.meas, ub=deepcopy(y_ub), lb=deepcopy(y_lb),
name='measurement_constraint')
self._box_constraints_is_set = True
def _optimize(self, v0, runs, param, **kwargs):
"""
:param v0:
:param runs:
:param param:
:param kwargs:
:return:
"""
if runs == 0:
sol = self._solver(x0=v0, lbx=self._v_lb, ubx=self._v_ub, lbg=self._g_lb, ubg=self._g_ub, p=param)
self._nlp_solution = sol
u_opt = sol['x'][self._u_ind[0]]
if self._nlp_options['warm_start']:
self._v0 = sol['x']
else:
pert_factor = kwargs.get('pert_factor', 0.1)
f_r_better = np.inf
v00 = v0
for r in range(runs):
sol = self._solver(x0=v00, lbx=self._v_lb, ubx=self._v_ub, lbg=self._g_lb, ubg=self._g_ub, p=param)
self._solver_status_wrapper()
if self._solver_status_code in [1,2] and sol['f'] < f_r_better:
f_r_better = sol['f']
self._nlp_solution = sol
u_opt = sol['x'][self._u_ind[0]]
if self._nlp_options['warm_start']:
self._v0 = sol['x']
v00 = clip(v0 + v0 * (1 - 2 * np.random.rand(self._n_v)) * pert_factor, self._v_lb, self._v_ub)
return u_opt
[docs] def optimize(self, x0, cp=None, tvp=None, v0=None, runs=0, fix_x0=True, **kwargs):
"""
Solves the MPC problem
:param x0: current system state
:type x0: list or casadi DM
:param cp: constant system parameters (these will be assumed constant along the prediction horizon)
:type cp: list or casadi DM, optional
:param tvp: time-varying system parameters (these can change during prediction horizon, entire parameter
history must be passed in)
:type tvp: dict, optional
:param v0: initial guess of the optimal vector
:type v0: list or casadi DM, optional
:param runs: number of optimizations to run. If different from zero will run very optimization will perturb the
initial guess v0 randomly.
ACHTUNG: This could cause problems with the integrators or give something outside constraints. The output
will be the solution with the minimum objective function (default 0)
:type runs: int
:param fix_x0: If True, the first state is fixed as the measured states. This is the classic MPC approach. If
False, also the initial state is optimized.
:type fix_x0: bool
:return: u_opt: first piece of optimal control sequence
"""
if not self._nlp_setup_done:
raise ValueError("Howdy! You need to setup the MPC before optimizing. Run .setup() on the MPC object.")
# Check the constant parameters
if self._model.n_p - self._n_tvp != 0:
if cp is not None:
cp = check_and_wrap_to_DM(cp)
if cp is None or cp.size1() != self._model.n_p - self._n_tvp:
raise ValueError(
f"The model has {self._model.n_p - self._n_tvp} constant parameter(s): "
f"{self._model.parameter_names}. You must pass me the value of these before running the "
f"optimization to the 'cp' parameter."
)
else:
if cp is not None:
warnings.warn("You are passing a parameter vector in the optimizer, but the model has no defined "
"parameters. I am ignoring the vector.")
if self._nlp_options['ipopt_debugger']:
# Reset the solution of the debugger before next optimization
self.debugger.reset_solution()
# Check the state
x0 = check_and_wrap_to_DM(x0)
if x0.shape[0] != self._n_x:
raise ValueError(
f"We have an issue mate, the x0 you supplied has dimension {x0.shape[0]} but the model has {self._n_x} "
f"states."
)
if fix_x0 is True:
# Force the initial state to be the measured state.
# Note that the path following problem modifies the model dimensions,
# so only the first n_x positions need to be forced where n_x is state number of the original problem
self._v_lb[self._x_ind[0][0:self._n_x]] = x0 / ca.DM(self._x_scaling[0:self._n_x])
self._v_ub[self._x_ind[0][0:self._n_x]] = x0 / ca.DM(self._x_scaling[0:self._n_x])
else:
x0_lb = kwargs.get('x0_lb', self._x_lb)
x0_ub = kwargs.get('x0_ub', self._x_ub)
self._v_lb[self._x_ind[0][0:self._n_x]] = x0_lb / ca.DM(self._x_scaling[0:self._n_x])
self._v_ub[self._x_ind[0][0:self._n_x]] = x0_ub / ca.DM(self._x_scaling[0:self._n_x])
# Check and prepare parameters
param = self._get_nlp_parameters(cp, tvp, **kwargs)
if v0 is None:
v0 = self._v0
if self._stats:
start = time.time()
# Compute the MPC
u_opt = self._optimize(v0, runs, param)
# Get the status of the solver
self._solver_status_wrapper()
# Print output
self._print_message()
# Reset the solution
self.reset_solution()
# Populate solution with new results
if self._stats:
elapsed_time = time.time() - start
self.solution.add('extime', elapsed_time)
self.solution.add('niterations', self._n_iterations)
self.solution.add('solvstatus', self._solver_status_code)
if self.stage_constraint.is_set and self.stage_constraint.is_soft:
self.stage_constraint.e_soft_value = self._nlp_solution['x'][self._e_soft_stage_ind[0]]
if self.terminal_constraint.is_set and self.terminal_constraint.is_soft:
self.terminal_constraint.e_soft_value = self._nlp_solution['x'][self._e_soft_term_ind[0]]
# Save predictions in the solution object. Done for plotting purposes.
self._save_predictions(cp, tvp)
# Save the references in the solution object. Done for plotting purposes.
self._save_references(param)
# Update the time clock - Useful for time-varying systems and references.
self._time += self.sampling_interval
# Interation counter
self._n_iterations += 1
# Extract first input from the optimal sequence
uopt = u_opt[0:self._n_u] * self._u_scaling[0:self._n_u]
return uopt
[docs] def minimize_final_time(self, weight=1):
"""
:param weight:
:return:
"""
self._minimize_final_time_flag = True
self._minimize_final_time_weight = weight
[docs] def plot_prediction(self, save_plot=False, plot_dir=None, name_file='mpc_prediction.html', show_plot=True,
extras=None, extras_names=None, title=None, format_figure=None, **kwargs):
"""
Plots the MPC predicted values.
:param save_plot: if True plot will be saved under 'plot_dir/name_file.html' if they are declared, otherwise in
current directory
:type save_plot: bool
:param plot_dir: path to the folder where plots are saved (default = None)
:type plot_dir: str
:param name_file: name of the file where plot will be saved (default = mpc_prediction.html)
:type name_file: str
:param show_plot: if True, shows plots (default = False)
:type show_plot: bool
:param extras: dictionary with values that will be plotted over the predictions if keys are equal to predicted
states/inputs.
:type extras: dict
:param extras_names: tags that will be attached to the extras in the legend
:type extras_names: list
:param title: title of the plots
:type title: str
:param format_figure: python function that modifies the format of the figure
:type format_figure: python function taking a bokeh figure object as an input
:return:
"""
import os
from bokeh.io import output_file, output_notebook, show, save
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, DataTable, TableColumn, CellFormatter, Div
from bokeh.layouts import gridplot, column, row, grid
from bokeh.palettes import Spectral4 as palettespectral
if self._nlp_solution is None:
raise RuntimeError("You need to run the MPC at least once to see the plots")
if save_plot:
if plot_dir is not None:
output_file(os.path.join(plot_dir, name_file))
else:
output_file(name_file)
else:
if kwargs.get('output_notebook', False):
output_notebook()
if extras is None:
extras = []
if extras_names is None:
extras_names = []
if isinstance(extras, dict):
extras = [extras]
elif not isinstance(extras, list) and not isinstance(extras, dict):
raise ValueError("The extras options should be a dictionary or a list of dictionaries")
res_extras_list = [0 for i in range(len(extras))]
for i in range(len(extras)):
res_extras_list[i] = {}
for k, name in enumerate(self._model.dynamical_state_names):
if name in extras[i].keys():
res_extras_list[i][name] = extras[i][name]
if len(extras) != len(extras_names):
raise ValueError("The length of the extra and extras_names must be the same.")
time = self._time
# TODO: consider time step for the x axis
x_pred, u_pred, dt_pred = self.return_prediction()
if dt_pred is None:
time_vector = np.linspace(time, time + (self._prediction_horizon) * self.sampling_interval,
self._prediction_horizon + 1)
else:
time_vector = [self._time]
for i in range(self._prediction_horizon):
time_vector.append(time_vector[i] + dt_pred[i])
input_dict = {i: [] for i in self._model.input_names}
states_dict = {i: [] for i in self._model.dynamical_state_names}
for k, name in enumerate(self._model.input_names):
input_dict[name] = u_pred[k, :]
for k, name in enumerate(self._model.dynamical_state_names):
states_dict[name] = x_pred[k, :]
p1 = [figure(title=title, background_fill_color="#fafafa") for i in range(self._model.n_x)]
for s, name in enumerate(self._model.dynamical_state_names):
p1[s].line(x=time_vector,
y=states_dict[name],
legend_label=name + '_pred', line_width=2)
for i in range(len(self.quad_stage_cost._references_list)):
if name in self.quad_stage_cost._references_list[i]['names']:
position = self.quad_stage_cost._references_list[i]['names'].index(name)
value = self.quad_stage_cost._references_list[i]['ref'][position]
p1[s].line([time_vector[0], time_vector[-1]], [value, value], legend_label=name + '_ref',
line_dash='dashed', line_color="red", line_width=2)
p1[s].yaxis.axis_label = name
p1[s].xaxis.axis_label = 'time'
if format_figure is not None:
p1[s] = format_figure(p1[s])
for i in range(len(extras)):
if name in list(res_extras_list[i].keys()):
p1[s].line(x=time_vector,
y=res_extras_list[i][name], line_width=2, color=palettespectral[i + 1],
legend_label=name + '_' + extras_names[i])
p1[s].yaxis.axis_label = name
p1[s].xaxis.axis_label = 'time'
if format_figure is not None:
p1[s] = format_figure(p1[s])
p2 = [figure(title=title, background_fill_color="#fafafa") for i in range(self._model.n_u)]
for s, name in enumerate(self._model.input_names):
p2[s].step(x=time_vector[:-1], y=input_dict[name],
legend_label=name + '_pred', mode="after", line_width=2)
p2[s].yaxis.axis_label = name
p2[s].xaxis.axis_label = 'time'
if format_figure is not None:
p2[s] = format_figure(p2[s])
# Create some data to print statistics
variables = []
values = []
if self.stage_constraint.is_soft:
variables.append('Slack soft constraint')
values.append(float(np.array(self.stage_constraint.e_soft_value).squeeze()))
heading = Div(text="MPC stats", height=80, sizing_mode="stretch_width", align='center',
style={'font-size': '200%'})
# heading fills available width
data = dict(
variables=variables,
values=values,
)
source = ColumnDataSource(data)
columns = [
TableColumn(field="variables", title="Variables"),
TableColumn(field="values", title="Values", formatter=CellFormatter()),
]
data_table = DataTable(source=source, columns=columns, width=400, height=280)
grid_states = gridplot(p1, ncols=3, sizing_mode="stretch_width")
grid_inputs = gridplot(p2, ncols=3, sizing_mode="stretch_width")
if show_plot:
states_header = Div(text="Predicted States", height=10, sizing_mode="stretch_width", align='center',
style={'font-size': '200%'})
inputs_header = Div(text="Predicted Inputs", height=10, sizing_mode="stretch_width", align='center',
style={'font-size': '200%'})
layout = row(column(states_header, grid_states, inputs_header, grid_inputs), column(heading, data_table))
show(layout)
else:
if save_plot:
save(grid)
[docs] def create_path_variable(self, name='theta', u_pf_lb=0.0001, u_pf_ub=1, u_pf_ref=None, u_pf_weight=10,
theta_guess=0, theta_lb=0, theta_ub=ca.inf):
"""
Set the path following variable. This must be used for building SX expression of the path.
:param name: name of the variable, default = 'theta'
:type name: string
:param u_pf_lb: lower bound on the path virtual input :math:`vel_{lb} \leq \dot{ \\theta }`, default = 0.0001
:type u_pf_lb: float
:param u_pf_ub: upper bound on the path virtual input :math:`vel_{ub} \geq \dot{ \\theta }`, default = 1
:type u_pf_ub: float
:param u_pf_ref: Reference for the path virtual input, default = None
:type u_pf_ref: float
:param u_pf_weight: Weight for the path virtual input , default = 10
:type u_pf_weight: float
:param theta_guess:
:param theta_lb:
:param theta_ub:
:return: casadi.SX
"""
if self._use_sx:
theta = ca.SX.sym(name)
else:
theta = ca.MX.sym(name)
self._paths_var_list.append(
{'theta': theta, 'u_pf_lb': u_pf_lb, 'u_pf_ub': u_pf_ub, 'u_pf_ref': u_pf_ref, 'u_pf_weight': u_pf_weight,
'theta_guess': theta_guess, 'theta_lb': theta_lb, 'theta_ub': theta_ub}
)
return theta
[docs] def get_time_variable(self):
"""
Useful for trajectory tacking
:return:
"""
self._time_var = self._model.t
return self._time_var
[docs] def set_quadratic_stage_cost(self, states=None, cost_states=None, states_references=None,
inputs=None, cost_inputs=None, inputs_references=None):
"""
More compact way to set the quadratic cost for the MPC. Mostly left for backwards compatibility.
To use only for set-point-tracking problems.
:param states: list of states name that will appear in the quadratic cost
:type states: list
:param cost_states: weights values that will be multiplied by the states
:type cost_states: list, numpy array, or casADi DM array
:param states_references: list of reference values
:type states_references: list
:param inputs: list of inputs names that will be multiplied by the states
:type inputs: list
:param cost_inputs: list of inputs weights that will be multiplied by the states
:type cost_inputs: list, numpy array, or casADi DM array
:param inputs_references: list of inputs reference values
:type inputs_references: list
:return:
"""
self.quad_stage_cost.add_states(names=states, weights=cost_states, ref=states_references)
self.quad_stage_cost.add_inputs(names=inputs, weights=cost_inputs, ref=inputs_references)
[docs] def set_quadratic_terminal_cost(self, states=None, cost=None, references=None):
"""
More compact way to set the quadratic cost for the MPC. Mostly left for backwards compatibility.
To use only for set-point-tracking problems.
:param states: list of states name that will appear in the quadratic cost
:type states: list
:param cost: list of weights values that will be multiplied by the states
:type cost: list, numpy array or CasADi DM
:param references: list of references values for the states
:type references:
:return:
"""
self.quad_terminal_cost.add_states(names=states, weights=cost, ref=references)
[docs] def set_terminal_constraints(self, terminal_constraint, name='terminal_constraint', lb=None, ub=None, is_soft=False,
max_violation=ca.inf,
weight=None):
"""
Allows to add a (nonlinear) terminal constraint.
:param terminal_constraint: It has to contain variables of the model
:type terminal_constraint: CasADi SX expression
:param lb: Lower bound on the constraint
:type lb: list of float, integer or casadi.DM.
:param ub: Upper bound on the constraint
:type ub: list of float, integer or casadi.DM.
:param is_soft: if True soft constraints are used (default False)
:type is_soft: bool
:param max_violation: (optional) Maximum violation if constraint is soft. Default: inf
:type max_violation: list float,integer or casadi.DM
:param weight: (optional) matrix of appropriate dimension. If is_soft=True it will be used to weight the soft
constraint in the objective function using a quadratic cost.
:type weight: casadi.DM
:return:
"""
# TODO check if all the variables used in the function are in the model
# TODO allow to pass either only the lower or the upper bound
self.terminal_constraint.constraint = terminal_constraint
self.terminal_constraint.lb = lb
self.terminal_constraint.ub = ub
self.terminal_constraint.is_soft = is_soft
self.terminal_constraint.max_violation = max_violation
self.terminal_constraint.weight = weight
self.terminal_constraint.name = name
def _setup(self, options=None, solver_options=None) -> None:
"""
Sets up the corresponding optimization problem (OP) of the MPC. This must be run before attempting to solve
the MPC.
:param options: Options for MPC. See documentation.
:type options: dict
:param solver_options: Dictionary with options for the optimizer. These options are solver specific. Refer to
the CasADi Documentation https://web.casadi.org/python-api/#nlp
:type solver_options: dict
:return: None
"""
if not self._scaling_is_set:
self.set_scaling()
if not self._time_varying_parameters_is_set:
self.set_time_varying_parameters()
if not self._box_constraints_is_set:
self.set_box_constraints()
if not self._initial_guess_is_set:
self.set_initial_guess()
if not self._nlp_options_is_set:
self.set_nlp_options(options)
if not self._solver_options_is_set:
self.set_solver_opts(solver_options)
if not self._sampling_time_is_set:
self.set_sampling_interval()
self._populate_solution()
# Path following
self._x_scaling_orig = deepcopy(self._x_scaling)
self._u_scaling_orig = deepcopy(self._u_scaling)
self._x_lb_orig = deepcopy(self._x_lb)
self._x_ub_orig = deepcopy(self._x_ub)
self._u_lb_orig = deepcopy(self._u_lb)
self._u_ub_orig = deepcopy(self._u_ub)
for i in range(self.n_of_path_vars):
theta = self._paths_var_list[i]['theta']
theta_vel_ub = self._paths_var_list[i]['u_pf_ub']
theta_vel_lb = self._paths_var_list[i]['u_pf_lb']
theta_lb = self._paths_var_list[i]['theta_lb']
theta_ub = self._paths_var_list[i]['theta_ub']
theta_guess = self._paths_var_list[i]['theta_guess']
# If path following the model increases of dimension to allow the path variable
self._model.add_dynamical_states(theta)
if self._use_sx:
u_theta = ca.SX.sym('u_theta')
else:
u_theta = ca.MX.sym('u_theta')
self._model.add_inputs(u_theta)
if self._model.discrete:
# TODO: here I am doing simply explicit euler. Maybe one could use the same discretization method of
# the model
self._model.add_dynamical_equations(theta + self.sampling_interval * u_theta)
else:
self._model.add_dynamical_equations(u_theta)
self._x_guess.append(theta_guess)
self._u_guess.append(theta_vel_lb + 0.0001)
self._u_lb.append(theta_vel_lb)
self._u_ub.append(theta_vel_ub)
self._x_lb.append(theta_lb)
self._x_ub.append(theta_ub)
self._u_scaling.append(1)
self._x_scaling.append(1)
if self._paths_var_list[i]['u_pf_ref'] is not None:
self.stage_cost.cost = (u_theta - self._paths_var_list[i]['u_pf_ref']) ** 2 * self._paths_var_list[i][
'u_pf_weight']
# Define cost terms
self._define_cost_terms()
# Scaling...
self._scale_problem()
# Define custom stage and terminal constraints
self.stage_constraint._check_and_setup(x_scale=self._x_scaling, u_scale=self._u_scaling,
y_scale=self._y_scaling)
self.terminal_constraint._check_and_setup(x_scale=self._x_scaling, u_scale=self._u_scaling,
y_scale=self._y_scaling)
self._check_mpc_is_well_posed()
if self._nlp_options['ipopt_debugger']:
# This dict saves the po
self._g_indices = {'dynamics_collocation': [],
'dynamics_multiple_shooting': [],
'nonlin_stag_const': [],
'nonlin_term_const': [],
'time_const.': []}
if self._nlp_setup_done is False:
model = self._model
# ... objective function.
if self._may_term_flag:
# Substiture references
if self.quad_terminal_cost.ref_placeholder.shape[0] != 0:
references = self.quad_terminal_cost.ref_placeholder
else:
if self._use_sx:
references = ca.SX.sym('r', 0)
else:
references = ca.MX.sym('r', 0)
self._may_term_fun = ca.Function('mayor_term',
[model.t, model.x,
references],
[self._may_term])
# Check time varying parameters
# tvp_ind = []
# TODO this should be moved in the optimiziation method
if len(self._time_varying_parameters) != 0:
p_names = model.parameter_names
for tvp in self._time_varying_parameters:
assert tvp in p_names, f"The time-varying parameter {tvp} is not in the model parameter. " \
f"The model parameters are {p_names}."
if self.terminal_constraint.is_set:
if self.terminal_constraint.is_soft:
e_terminal = model.t.sym('e_terminal', self.terminal_constraint.constraint.size1())
self._terminal_constraints_fun = ca.Function('soft_terminal_const_term',
[model.t, model.x, model.z, model.p, e_terminal],
[ca.vertcat(self.terminal_constraint.constraint -
e_terminal,
-self.terminal_constraint.constraint -
e_terminal)])
else:
self._terminal_constraints_fun = ca.Function('terminal_const_term',
[model.t, model.x, model.z, model.p],
[self.terminal_constraint.constraint])
# Define casadi function for auxiliary stage nonlinear constraints
if self.stage_constraint.is_set:
if self.stage_constraint.is_soft:
e_stage = model.t.sym('e_stage', self.stage_constraint.constraint.size1())
self._stage_constraints_fun = ca.Function('soft_stage_nl_constr',
[model.t, model.x, model.u, model.z, model.p, e_stage],
[ca.vertcat(self.stage_constraint.constraint - e_stage,
-self.stage_constraint.constraint - e_stage)])
else:
self._stage_constraints_fun = ca.Function('stage_nl_constr',
[model.t, model.x, model.u, model.z, model.p],
[self.stage_constraint.constraint])
if self._lag_term_flag:
model.set_quadrature_function(self._lag_term)
references = self.quad_stage_cost.ref_placeholder
u_old = self.quad_stage_cost.input_change_placeholder
else:
references = []
u_old = []
problem = dict(model)
if self._lag_term_flag:
if self.quad_stage_cost.ref_placeholder.shape[0] != 0:
references = self.quad_stage_cost.ref_placeholder
else:
if self._use_sx:
references = ca.SX.sym('r', 0)
else:
references = ca.MX.sym('r', 0)
u_old = self.quad_stage_cost.input_change_placeholder
self._lag_term_fun = ca.Function('lagrange_term',
[problem['t'], problem['x'],
problem['u'], problem['z'], problem['p'],
references, u_old],
[self._lag_term])
if self._nlp_options['integration_method'] == 'collocation':
continuous2discrete(problem, **self._nlp_options)
# Slack for soft constraints
if self._use_sx:
ek = ca.SX.sym('e', self.stage_constraint.size)
else:
ek = ca.MX.sym('e', self.stage_constraint.size)
# Add all constraints to the collocation points
# box constraints
# TODO is here options['degree']+ 1 or problem['ode']???
n_xik = (self._nlp_options['degree']) * (model.n_x)
n_zik = (self._nlp_options['degree']) * (model.n_z)
x_ik_guess = np.tile(self._x_guess, self._nlp_options['degree'])
x_ik_ub = np.tile(self._x_ub, self._nlp_options['degree'])
x_ik_lb = np.tile(self._x_lb, self._nlp_options['degree'])
if model.n_z > 0:
z_ik_guess = np.tile(self._z_guess, self._nlp_options['degree'])
z_ik_ub = np.tile(self._z_ub, self._nlp_options['degree'])
z_ik_lb = np.tile(self._z_lb, self._nlp_options['degree'])
# nonlinear constraints
# Constraints in the control interval
gk_col = []
gk_col_lb = []
gk_col_ub = []
for k in range(self._nlp_options['degree']):
x_col = problem['collocation_points_ode'][k]
z_col = problem['collocation_points_alg'][k]
if self.stage_constraint.is_set:
if self.stage_constraint.is_soft:
residual = self._stage_constraints_fun(problem['t'], x_col, problem['u'], z_col,
problem['p'], ek)
gk_col.append(residual)
gk_col_lb.append(np.repeat(-np.inf, self.stage_constraint.size * 2))
gk_col_ub.append(self.stage_constraint.ub)
gk_col_ub.append([-lb for lb in self.stage_constraint.lb])
else:
residual = self._stage_constraints_fun(problem['t'], x_col, problem['u'], z_col,
problem['p'])
gk_col.append(residual)
gk_col_lb.append(self.stage_constraint.lb)
gk_col_ub.append(self.stage_constraint.ub)
gk_col.append(problem['collocation_equations'])
gk_col_lb.append(np.zeros(problem['collocation_equations'].shape[0]))
gk_col_ub.append(np.zeros(problem['collocation_equations'].shape[0]))
# Create function
int_dynamics_fun = ca.Function("integrator_collocation",
[problem['t'], # time variable (for time varying systems)
problem['dt'], # dt variable (for possibly different sampling time)
ca.vertcat(*problem['collocation_points_ode']),
# x at collocation points
problem['x'], # x at the begining of the interval
problem['u'], # input of the interval
ca.vertcat(*problem['collocation_points_alg']), # alg states at coll.
problem['p'], # parameters (constant over the interval at least)
ek, # slack variable for (possibly) soft constrained systems
references,
u_old],
[ca.vertcat(*gk_col), problem['ode'], problem['quad']])
elif self._nlp_options['integration_method'] == 'discrete':
n_zik = model.n_z
z_ik_guess = self._z_guess
z_ik_ub = self._z_ub
z_ik_lb = self._z_lb
int_dynamics_fun = ca.Function('integrator_discrete',
[problem['t'],
problem['dt'],
problem['x'],
problem['u'],
problem['z'],
problem['p'],
],
[problem['ode']])
elif self._nlp_options['integration_method'] in ['rk', 'rk4']:
continuous2discrete(problem, **self._nlp_options)
n_zik = (self._nlp_options['order']) * model.n_z
z_ik_guess = np.tile(self._z_guess, self._nlp_options['order'])
z_ik_ub = np.tile(self._z_ub, self._nlp_options['order'])
z_ik_lb = np.tile(self._z_lb, self._nlp_options['order'])
# Create function
int_dynamics_fun = ca.Function("integrator_collocation",
[
problem['t'], # time variable (for time varying systems)
problem['dt'], # dt variable (for possibly different sampling time)
problem['x'], # x at the beginning of the interval
problem['u'], # input of the interval
problem['discretization_points'], # algebraic variables
problem['z'],
problem['p'], # parameters (constant over the interval at least)
references,
u_old
],
[problem['alg'], problem['ode'], problem['quad']])
elif self._nlp_options['integration_method'] in ['idas', 'cvodes']:
n_zik = model.n_z
z_ik_guess = self._z_guess
z_ik_ub = self._z_ub
z_ik_lb = self._z_lb
if model.n_z == 0:
dae = {'t': problem['t'],
'x': problem['x'],
'p': ca.vertcat(problem['u'], problem['p'], u_old, references, problem['dt']),
'ode': model.ode,
'quad': self._lag_term}
else:
dae = {'t': problem['t'],
'x': ca.vertcat(problem['x'], problem['z']),
'p': ca.vertcat(problem['u'], problem['p'], u_old, references, problem['dt']),
'ode': ca.vertcat(model.ode, model.alg),
'quad': self._lag_term}
opts = {'abstol': 1e-10, 'reltol': 1e-10, 'tf': self._sampling_interval}
int_dynamics_fun = ca.integrator('integrator_ms', self._nlp_options['integration_method'], dae, opts)
else:
raise ValueError(f"Integration {self._nlp_options['integration_method']} not defined.")
# Total number of optimization variable
n_v = self._control_horizon * model.n_u + (self._prediction_horizon + 1) * (model.n_x + model.n_z)
if self._nlp_options['integration_method'] == 'collocation':
n_v += self._prediction_horizon * (n_xik + n_zik)
if self._nlp_options['integration_method'] in ['rk', 'rk4']:
n_v += self._prediction_horizon * (n_zik)
if self.stage_constraint.is_soft:
n_v += self.stage_constraint.constraint.size1()
if self.terminal_constraint.is_soft:
n_v += self.terminal_constraint.constraint.size1()
if self._custom_constraint_is_soft_flag:
n_v += self._custom_constraint_size
if self._minimize_final_time_flag:
n_v += self._prediction_horizon
v = ca.MX.sym('v', n_v)
v_lb = np.zeros(n_v)
v_ub = np.zeros(n_v)
v_guess = np.zeros(n_v)
offset = 0
# Predefine optimization variable - Those always exist, independently of the method used
x = np.resize(np.array([], dtype=ca.MX), (self._prediction_horizon + 1, 1))
x_ind = []
for ii in range(self._prediction_horizon + 1):
x[ii, 0] = v[offset:offset + model.n_x]
x_ind.append([j for j in range(offset, offset + model.n_x)])
v_guess[offset:offset + model.n_x] = self._x_guess
v_lb[offset:offset + model.n_x] = self._x_lb
v_ub[offset:offset + model.n_x] = self._x_ub
if self._mixed_integer_flag:
self._discrete_variables_bool.extend([False] * model._n_x)
offset += model.n_x
u = np.resize(np.array([], dtype=ca.MX), (self._control_horizon, 1))
u_ind = []
for ii in range(self._control_horizon):
u[ii, 0] = v[offset:offset + model.n_u]
u_ind.append([j for j in range(offset, offset + model.n_u)])
v_guess[offset:offset + model.n_u] = self._u_guess
v_lb[offset:offset + model.n_u] = self._u_lb
v_ub[offset:offset + model.n_u] = self._u_ub
if self._mixed_integer_flag:
self._discrete_variables_bool.extend(model.discrete_u)
offset += model.n_u
if model.n_z > 0:
z = np.resize(np.array([], dtype=ca.MX), (self._prediction_horizon + 1, 1))
z_ind = []
for ii in range(self._prediction_horizon + 1):
z[ii, 0] = v[offset:offset + model.n_z]
z_ind.append([j for j in range(offset, offset + model.n_z)])
v_guess[offset:offset + model.n_z] = self._z_guess
v_lb[offset:offset + model.n_z] = self._z_lb
v_ub[offset:offset + model.n_z] = self._z_ub
offset += model.n_z
# Some other variables must be added, depending on the approximation method used
if self._nlp_options['integration_method'] == 'collocation':
ip = np.resize(np.array([], dtype=ca.MX), (self._prediction_horizon, 1))
zp = np.resize(np.array([], dtype=ca.MX), (self._prediction_horizon, 1))
for ii in range(self._prediction_horizon):
ip[ii, 0] = v[offset:offset + n_xik]
v_guess[offset:offset + n_xik] = x_ik_guess
v_lb[offset:offset + n_xik] = x_ik_lb
v_ub[offset:offset + n_xik] = x_ik_ub
if self._mixed_integer_flag:
self._discrete_variables_bool.extend([False] * n_xik)
offset += n_xik
if model.n_z > 0:
zp[ii, 0] = v[offset:offset + n_zik]
v_guess[offset:offset + n_zik] = z_ik_guess
v_lb[offset:offset + n_zik] = z_ik_lb
v_ub[offset:offset + n_zik] = z_ik_ub
offset += n_zik
elif self._nlp_options['integration_method'] in ['rk', 'rk4', 'discrete', 'idas', 'cvodes']:
zp = np.resize(np.array([], dtype=ca.MX), (self._prediction_horizon, 1))
for ii in range(self._prediction_horizon):
zp[ii, 0] = v[offset:offset + n_zik]
v_guess[offset:offset + n_zik] = z_ik_guess
v_lb[offset:offset + n_zik] = z_ik_lb
v_ub[offset:offset + n_zik] = z_ik_ub
offset += n_zik
if self.stage_constraint.is_soft:
e_soft_stage = v[offset:offset + self.stage_constraint.size]
self._e_soft_stage_ind = [j for j in range(offset, offset + self.stage_constraint.size)]
v_lb[offset:offset + self.stage_constraint.size] = np.zeros(self.stage_constraint.size)
v_ub[offset:offset + self.stage_constraint.size] = self.stage_constraint.max_violation
v_guess[offset:offset + self.stage_constraint.size] = np.zeros(self.stage_constraint.size)
if self._mixed_integer_flag:
self._discrete_variables_bool.extend([False])
offset += self.stage_constraint.size
else:
e_soft_stage = ca.MX.sym('e_soft_stage', 0)
if self.terminal_constraint.is_soft:
e_soft_term = v[offset:offset + self.terminal_constraint.size]
self._e_soft_term_ind = [j for j in range(offset, offset + self.terminal_constraint.size)]
v_lb[offset:offset + self.terminal_constraint.size] = np.zeros(self.terminal_constraint.size)
v_ub[offset:offset + self.terminal_constraint.size] = self.terminal_constraint.max_violation
v_guess[offset:offset + self.terminal_constraint.size] = np.zeros(self.terminal_constraint.size)
if self._mixed_integer_flag:
self._discrete_variables_bool.extend([False])
offset += self.terminal_constraint.size
if self._custom_constraint_is_soft_flag:
e_cus = v[offset:offset + self._custom_constraint_size]
v_lb[offset:offset + self._custom_constraint_size] = np.zeros(self._custom_constraint_size)
v_ub[offset:offset + self._custom_constraint_size] = self._custom_constraint_maximum_violation
v_guess[offset:offset + self._custom_constraint_size] = self._custom_constraint_size
offset += self._custom_constraint_size
# Create an entry for every reference
tv_stage_ref_list = []
for ii in range(len(self.quad_stage_cost._trajectories_list)):
for name in self.quad_stage_cost._trajectories_list[ii]['names']:
if not isinstance(self.quad_stage_cost._trajectories_list[ii]['ref'], ca.SX) and not isinstance(
self.quad_stage_cost._trajectories_list[ii]['ref'], ca.MX):
tv_stage_ref_list.append(catools.entry(name + '_sr', shape=(1, self._prediction_horizon)))
tv_term_ref_list = []
for ii in range(len(self.quad_terminal_cost._trajectories_list)):
for name in self.quad_terminal_cost._trajectories_list[ii]['names']:
if not isinstance(self.quad_terminal_cost._trajectories_list[ii]['ref'], ca.SX) and not isinstance(
self.quad_terminal_cost._trajectories_list[ii]['ref'], ca.MX):
tv_term_ref_list.append(catools.entry(name + '_tr', shape=1))
# Predefine parameters (those are fixed and not optimized)
param_npl_mpc = catools.struct_symMX([catools.entry("tv_p", shape=(self._n_tvp, self._prediction_horizon)),
catools.entry("c_p", shape=model.n_p - self._n_tvp),
catools.entry('u_old',
shape=len(self.quad_stage_cost._ind_input_changes)),
catools.entry('dt', shape=self._prediction_horizon),
catools.entry('time', shape=1)] +
tv_term_ref_list + tv_stage_ref_list)
tv_p = param_npl_mpc['tv_p']
c_p = param_npl_mpc['c_p']
u_old = param_npl_mpc['u_old']
# Concat all the time-varying trajectories
tv_ref_sc = []
for ii in range(len(self.quad_stage_cost._trajectories_list)):
for name in self.quad_stage_cost._trajectories_list[ii]['names']:
if self.quad_stage_cost._trajectories_list[ii]['ref'] is None:
tv_ref_sc.append(param_npl_mpc[name + '_sr'])
if len(tv_ref_sc) > 0:
tv_ref_sc = ca.vertcat(*tv_ref_sc)
else:
tv_ref_sc = ca.MX.sym('bla', (0, self._prediction_horizon))
tv_ref_tc = []
for ii in range(len(self.quad_terminal_cost._trajectories_list)):
for name in self.quad_terminal_cost._trajectories_list[ii]['names']:
if self.quad_terminal_cost._trajectories_list[ii]['ref'] is None:
tv_ref_tc.append(param_npl_mpc[name + '_tr'])
if len(tv_ref_tc) > 0:
tv_ref_tc = ca.vertcat(*tv_ref_tc)
else:
tv_ref_tc = ca.MX.sym('bla', 0)
t_ind = []
if self._minimize_final_time_flag is False:
_dt = param_npl_mpc['dt']
else:
_dt = v[offset:offset + self.prediction_horizon]
t_ind = [j for j in range(offset, offset + self.prediction_horizon)]
v_lb[offset:offset + self.prediction_horizon] = np.zeros(self.prediction_horizon)
v_ub[offset:offset + self.prediction_horizon] = ca.inf * np.ones(self.prediction_horizon)
v_guess[offset:offset + self.prediction_horizon] = self._sampling_interval * np.ones(
self.prediction_horizon)
offset += self.prediction_horizon
# Constraint function for the NLP
g = []
g_lb = []
g_ub = []
J = 0
ind_g = 0
# get the current sampling time
time = param_npl_mpc['time']
for ii in range(self._prediction_horizon):
x_ii = x[ii, 0]
if ii < self._control_horizon:
u_ii = u[ii, 0]
if ii >= 1:
u_old0 = [u_ii[jj] for jj in self.quad_stage_cost._ind_input_changes]
u_old0 = ca.vertcat(*u_old0)
else:
u_old0 = u_old
p_ii = self._rearrange_parameters(tv_p[:, ii], c_p)
tv_ref_sc_ii = tv_ref_sc[:, ii]
dt_ii = _dt[ii]
if self._nlp_options['integration_method'] in ['idas', 'cvodes']:
sol = int_dynamics_fun(x0=ca.vertcat(x_ii, zp[ii, 0]),
p=ca.vertcat(u_ii, p_ii, u_old0, tv_ref_sc_ii, dt_ii))
x_ii_1 = sol['xf']
quad = sol['qf']
elif self._nlp_options['integration_method'] in ['rk4', 'rk']:
[alg, x_ii_1, quad] = int_dynamics_fun(
time, dt_ii, x_ii, u_ii, zp[ii, 0], p_ii, tv_ref_sc_ii, e_soft_stage, u_old0)
g.append(alg)
g_lb.append(np.zeros(alg.size1()))
g_ub.append(np.zeros(alg.size1()))
if self._nlp_options['ipopt_debugger']:
self._g_indices['dynamics_collocation'].append([ind_g, ind_g + alg.size1()])
ind_g += alg.size1()
elif self._nlp_options['integration_method'] == 'collocation':
[g_coll, x_ii_1, quad] = int_dynamics_fun(
time, dt_ii, ip[ii, 0], x_ii, u_ii, zp[ii, 0], p_ii, e_soft_stage, tv_ref_sc_ii, u_old0)
g.append(g_coll)
g_lb.extend(gk_col_lb)
g_ub.extend(gk_col_ub)
if self._nlp_options['ipopt_debugger']:
self._g_indices['dynamics_collocation'].append([ind_g, ind_g + g_coll.size1()])
ind_g += g_coll.size1()
elif self._nlp_options['integration_method'] == 'discrete':
x_ii_1 = int_dynamics_fun(time, dt_ii, x_ii, u_ii, zp[ii, 0], p_ii)
g.append(x[ii + 1, 0] - x_ii_1)
g_lb.append(np.zeros(model.n_x))
g_ub.append(np.zeros(model.n_x))
if self._nlp_options['ipopt_debugger']:
self._g_indices['dynamics_multiple_shooting'].append([ind_g, ind_g + x_ii_1.shape[0]])
ind_g += x_ii_1.size1()
# Add lagrange term
# TODO check if call is necessary
if self._lag_term_flag:
if self._nlp_options['integration_method'] == 'discrete' or \
self._nlp_options['objective_function'] == 'discrete':
quad = self._lag_term_fun(time, x_ii, u_ii, zp[ii, 0], p_ii, tv_ref_sc_ii, u_old0)
J += quad
if self._may_term_flag and ii == self._prediction_horizon - 1:
J += self._may_term_fun(time + dt_ii, x_ii_1, tv_ref_tc)
if self.terminal_constraint.is_set and ii == self._prediction_horizon - 1:
if self.terminal_constraint.is_soft:
residual = self._terminal_constraints_fun(time, x_ii, zp[ii, 0], p_ii, e_soft_term)
J += self.terminal_constraint.cost(e_soft_term)
g.append(residual)
g_lb.append([-ca.inf] * self.terminal_constraint.size * 2)
g_ub.append([ub for ub in self.terminal_constraint.ub])
g_ub.append([-lb for lb in self.terminal_constraint.lb])
if self._nlp_options['ipopt_debugger']:
self._g_indices['nonlin_term_const'].append(
[ind_g, ind_g + self.terminal_constraint.size * 2])
ind_g += self.terminal_constraint.size * 2
else:
residual = self._terminal_constraints_fun(time, x_ii_1, zp[ii, 0], p_ii)
g.append(residual)
g_lb.append(self.terminal_constraint.lb)
g_ub.append(self.terminal_constraint.ub)
if self._nlp_options['ipopt_debugger']:
self._g_indices['nonlin_term_const'].append(
[ind_g, ind_g + residual.size1()])
ind_g += residual.size1()
if self.stage_constraint.is_set:
if self.stage_constraint.is_soft:
residual = self._stage_constraints_fun(time, x_ii, u_ii, zp[ii, 0], p_ii, e_soft_stage)
J += self.stage_constraint.cost(e_soft_stage)
g.append(residual)
g_lb.append([-ca.inf] * self.stage_constraint.size * 2)
g_ub.append([ub for ub in self.stage_constraint.ub])
g_ub.append([-lb for lb in self.stage_constraint.lb])
if self._nlp_options['ipopt_debugger']:
self._g_indices['nonlin_stag_const'].append(
[ind_g, ind_g + residual.size1()])
ind_g += residual.size1()
else:
residual = self._stage_constraints_fun(time, x_ii, u_ii, zp[ii, 0], p_ii)
g.append(residual)
g_lb.append(self.stage_constraint.lb)
g_ub.append(self.stage_constraint.ub)
if self._nlp_options['ipopt_debugger']:
self._g_indices['nonlin_stag_const'].append(
[ind_g, ind_g + residual.size1()])
ind_g += residual.size1()
# update time in the horizon
time += dt_ii
if self._custom_constraint_flag:
if self._custom_constraint_is_soft_flag:
W = np.diag([10000] * self._custom_constraint_size)
J += ca.mtimes(e_cus.T, ca.mtimes(W, e_cus))
g.append(self._custom_constraint_fun(v, x_ind, u_ind) - e_cus)
g_lb.append([-ca.inf] * self._custom_constraint_size)
g_ub.append(self._custom_constraint_fun_ub)
g.append(self._custom_constraint_fun(v, x_ind, u_ind) + e_cus)
g_lb.append(self._custom_constraint_fun_lb)
g_ub.append([ca.inf] * self._custom_constraint_size)
else:
g.append(self._custom_constraint_fun(v, x_ind, u_ind))
g_lb.append(self._custom_constraint_fun_lb)
g_ub.append(self._custom_constraint_fun_ub)
if self._minimize_final_time_flag:
# Force all the sampling time to be equal
for kkk in range(self.prediction_horizon - 1):
g.append(_dt[kkk] - _dt[kkk + 1])
g_lb.append(0)
g_ub.append(0)
# Add the time to the objective function
J += ca.sum1(_dt) * self._minimize_final_time_weight
g = ca.vertcat(*g)
self._g_lb = ca.DM(ca.vertcat(*g_lb))
self._g_ub = ca.DM(ca.vertcat(*g_ub))
self._v0 = ca.DM(v_guess)
self._v_lb = ca.DM(v_lb)
self._v_ub = ca.DM(v_ub)
self._u_ind = u_ind
self._x_ind = x_ind
self._dt_ind = t_ind
self._J = J
self._v = v
self._param_npl_mpc = param_npl_mpc
self._g = g
self._nlp_setup_done = True
self._n_v = n_v
if self._nlp_options['ipopt_debugger']:
# Adds the callback debugger.
debugger = IpoptDebugger('ipopt_debugger', self._n_v, self._g.shape[0], 0, 0, self._x_ind, self._u_ind)
self.debugger = debugger
self._nlp_opts.update({'iteration_callback': debugger})
nlp_dict = {'f': self._J, 'x': self._v, 'p': self._param_npl_mpc, 'g': self._g}
if self._solver_name in self._solver_name_list_nlp:
solver = ca.nlpsol('solver', self._solver_name, nlp_dict, self._nlp_opts)
elif self._solver_name in self._solver_name_list_qp:
solver = ca.qpsol('solver', self._solver_name, nlp_dict, self._nlp_opts)
else:
raise ValueError(
f"The solver {self._solver_name} does no exist. The possible solver are {self._solver_name_list}."
)
self._solver = solver
[docs] def setup(self, options=None, solver_options=None) -> None:
"""
Sets up the corresponding optimization problem (OP) of the MPC. This must be run before attempting to solve
the MPC.
:param options: Options for MPC. See documentation.
:type options: dict
:param solver_options: Dictionary with options for the optimizer. These options are solver specific. Refer to
the CasADi Documentation https://web.casadi.org/python-api/#nlp
:type solver_options: dict
:return: None
"""
self._setup(options=options, solver_options=solver_options)
[docs] def return_prediction(self):
"""
Returns the mpc prediction.
:return: x_pred, u_pred, t_pred
"""
if self._nlp_solution is not None:
x_pred = np.zeros((self._model.n_x, self._prediction_horizon + 1))
u_pred = np.zeros((self._model.n_u, self._control_horizon))
dt_pred = np.zeros(self._prediction_horizon)
for ii in range(self._prediction_horizon + 1):
x_pred[:, ii] = np.asarray(self._nlp_solution['x'][self._x_ind[ii]]).squeeze() * self._x_scaling
for ii in range(self._control_horizon):
u_pred[:, ii] = np.asarray(self._nlp_solution['x'][self._u_ind[ii]]).squeeze() * self._u_scaling
if len(self._dt_ind) > 0:
for ii in range(self._prediction_horizon):
dt_pred[ii] = np.asarray(self._nlp_solution['x'][self._dt_ind[ii]]).squeeze()
else:
dt_pred = None
return x_pred, u_pred, dt_pred
else:
warnings.warn("There is still no mpc solution available. Run mpc.optimize() to get one.")
return None, None, None
[docs] def get_reference(self, time):
"""
Returns reference values at a given time. If works for path following, trajectory tracking and reference
tracking problems. For the reference tracking problem it returns always a (constant) reference.
:param time:
:return:
"""
references = []
for ref in self.quad_stage_cost._references_list:
value = ref['ref']
references.append({'type': ref['type'], 'value': value, 'ind': ref['ind']})
for ref in self.quad_stage_cost._trajectories_list:
traj_i = ref['ref']
if traj_i is not None:
if isinstance(traj_i, ca.SX):
# In this case the trajectory is given as a function, which has to be evaluated at
# the current time
value = ca.DM(ca.substitute(traj_i, self.time_var, time))
else:
# In this case, the trajectory is just a series of values
value = traj_i[self.n_iterations]
references.append({'type': ref['type'], 'value': value, 'ind': ref['ind']})
else:
raise ValueError("You did not supply a reference trajectory for the trajectory tracking problem."
"Use set_trajectory to set it up or add a function trajectory in the cost function."
)
for _ in self.quad_stage_cost._paths_list:
for i, path_var in enumerate(self._controller._paths_var_list):
theta = path_var['theta']
path_i = ca.substitute(path_i, theta, self._controller.solution[f'thetapf{i}'][0])
return references
@property
def obj_fun(self):
"""
:return: Objective function (MX)
"""
if self._nlp_setup_done is False:
raise AttributeError(
"You need to setup the NLP by running mpc.setup() method before accessing the objective function."
)
return self._J
@property
def prediction_horizon(self):
"""
:return:
"""
return self._prediction_horizon
@prediction_horizon.setter
def prediction_horizon(self, arg):
if isinstance(arg, int) and arg >= 1:
self._prediction_horizon = arg
self._prediction_horizon_is_set = True
else:
raise ValueError("The horizon numer must be a positive nonzero integer")
@property
def control_horizon(self):
"""
:return:
"""
return self._control_horizon
@control_horizon.setter
def control_horizon(self, arg):
if isinstance(arg, int) and arg >= 1:
self._control_horizon = arg
self._control_horizon_is_set = True
else:
raise ValueError("The horizon numer must be a positive nonzero integer")
@property
def n_of_path_vars(self):
"""
Number of path variables used for path following
:return:
"""
return len(self._paths_var_list)
@property
def n_tvp(self):
"""
:return:
"""
return self._n_tvp
@property
def time_var(self):
"""
:return:
"""
return self._time_var
[docs]class LMPC(Controller, DynamicOptimization):
""""""
def __init__(self, model, id=None, name=None, plot_backend='bokeh', use_sx=True):
# Copy the steady state values. Because they will get lost afer the constructor
self._steady_state = model._steady_state
"""Constructor method"""
super().__init__(model, id=id, name=name, plot_backend=plot_backend)
if not model.is_linear():
raise TypeError("The model must be linear. Use the NMPC class instead.")
if not model.discrete:
raise TypeError("The model not discrete-time. Please run model.discretize() before or build "
"directl a discrete model.")
self._may_term_flag = False
self._lag_term_flag = False
self._prediction_horizon_is_set = False
self._control_horizon_is_set = False
self._change_input_term_flag = False
self._mixed_integer_flag = False
self._trajectory_following_flag = False
self._mpc_options_is_set = False
self._paths_var_list = []
self._time = 0
self._n_iterations = 0
# Initialize the costs
self._time_var = []
self._lag_term = 0
self._may_term = 0
self._minimize_final_time_flag = False
self._time_varying_parameters_ind = []
self.x_ub = []
self.x_lb = []
self.u_ub = []
self.u_lb = []
self.time_varying_parameters = []
self._x_scaling = []
self._u_scaling = []
self._y_scaling = []
self._prediction_horizon = []
self._control_horizon = []
self._e_soft_stage_ind = []
self._e_term_stage_ind = []
# In some cases, for example path following with interpolated path using spline, the objective
# function needs to be defined in MX variable instead of SX, because the CasADi spline interpolation
# is defined only in MX functions.
self._use_sx = use_sx
# Save dimensions of the model. This can be useful because the MPC can change these later, but we still need
# them
self._n_x = deepcopy(model.n_x)
self._n_u = deepcopy(model.n_u)
self._n_y = deepcopy(model.n_y)
self._n_z = deepcopy(model.n_z)
self._n_p = deepcopy(model.n_p)
self._n_m = 1
self._options = {}
# Initialize weighting matrices
self._P = None
self._Q = None
self._R = None
# default solver
self._solver_name = 'qpoases'
def _update_type(self) -> None:
"""
:return:
"""
self._type = 'LMPC'
def _parse_tvp_parameters_values(self, tvp):
"""
:param tvp:
:return:
"""
# Get horizon of tvp values
if tvp is not None:
self._time_varying_parameters_horizon = ca.DM.zeros(self._n_tvp, self.prediction_horizon)
tvp_counter = 0
for key, value in tvp.items():
if len(value) < self.prediction_horizon:
raise TypeError(
"When passing time-varying parameters, you need to pass a number of values at least "
f"as long as the prediction horizon. The parameter {key} has {len(value)} values but the MPC "
f"has a prediction horizon length of {self._prediction_horizon}."
)
value = tvp[key]
self._time_varying_parameters_horizon[tvp_counter, :] = value[:self._prediction_horizon]
tvp_counter += 1
elif self._time_varying_parameters_values is not None:
self._get_tvp_parameters_values()
else:
raise ValueError(
f"Mate, I know there are {self._n_tvp} time varying parameters but you did not pass me any."
f"Please provide me with the values of the parameters, either to the optimize() method or to the "
f"set_time_varying_parameters() method."
)
def _save_predictions(self, cp, tvp):
"""
Store prediction in the solution object. Necessary for plotting.
:return:
"""
# Save prediction in solution
x_pred, u_pred = self.return_prediction()
if self._n_tvp > 0:
p = self._rearrange_parameters_numeric(tvp[:, 0], cp)
for ii in range(1, self._prediction_horizon):
p = ca.horzcat(p, self._rearrange_parameters_numeric(tvp[:, ii], cp))
else:
p = cp
# Save solution in the solution class
self.solution.add('x', x_pred[:self._n_x, :])
self.solution.add('u', u_pred[:self._n_u, :])
self.solution.add('p', p)
t_pred = np.linspace(self._time, self._time + (self.prediction_horizon) * self.sampling_interval,
self.prediction_horizon + 1)
self.solution.add('t', ca.DM(t_pred).T)
def _scale_problem(self):
"""
:return:
"""
self._u_ub = scale_vector(self._u_ub, self._u_scaling)
self._u_lb = scale_vector(self._u_lb, self._u_scaling)
self._x_ub = scale_vector(self._x_ub, self._x_scaling)
self._x_lb = scale_vector(self._x_lb, self._x_scaling)
# ... ode ...
self._model.scale(self._u_scaling, id='u')
self._model.scale(self._x_scaling, id='x')
# ... cost matrices ...
if self.Q is not None:
self.Q = np.array(self._x_scaling).T * self.Q * np.array(self._x_scaling).T
if self.P is not None:
self.P = np.array(self._x_scaling).T * self.P * np.array(self._x_scaling).T
if self.R is not None:
self.R = np.array(self._u_scaling).T * self.R * np.array(self._u_scaling).T
def _check_mpc_is_well_posed(self):
"""
:return:
"""
if self.Q is None and self.R is None and self.P is None:
raise ValueError("You need to define at least one of the weighting matrices before setting up the LMPC.")
if not self._prediction_horizon_is_set:
raise ValueError("You must set a prediction horizon length before")
if not self._control_horizon_is_set:
raise ValueError("You must set a control horizon length before.")
if self._Q is not None:
if self._Q.shape != (self._n_x, self._n_x):
raise ValueError(
f"The state matrix Q must be of dimension {(self._n_x, self._n_x)}, you have dimension {self._Q.shape}")
if self._P is not None:
if self._P.shape != (self._n_x, self._n_x):
raise ValueError(
f"The state matrix P must be of dimension {(self._n_x, self._n_x)}, you have dimension {self._P.shape}")
if self._R is not None:
if self._R.shape != (self._n_u, self._n_u):
raise ValueError(
f"The input matrix Q must be of dimension {(self._n_u, self._n_u)}, you have dimension {self._R.shape}")
# # Check tvp and initialize horizon of tvp values
# if self._time_varying_parameters_values is not None:
# self._time_varying_parameters_horizon = ca.DM.zeros((self._n_tvp), self.prediction_horizon)
# tvp_counter = 0
# for key, value in self._time_varying_parameters_values.items():
# if len(value) < self.prediction_horizon:
# raise TypeError(
# f"When passing time-varying parameters, you need to pass a number of values at least "
# f"as long as the prediction horizon. The parameter {key} has {len(value)} values but the MPC "
# f"has a prediction horizon length of {self._prediction_horizon}."
# )
#
# value = self._time_varying_parameters_values[key]
# self._time_varying_parameters_horizon[tvp_counter, :] = value[0:self._prediction_horizon]
# tvp_counter += 1
def _save_references(self):
x_ref = ca.DM.nan(self._n_x, self.horizon)
u_ref = ca.DM.nan(self._n_u, self.horizon)
x_ref = ca.repmat(ca.DM.zeros(self._n_x).T, self.horizon).T
u_ref = ca.repmat(ca.DM.zeros(self._n_u).T, self.horizon).T
self.solution.add('u_ref', u_ref)
self.solution.add('x_ref', x_ref)
[docs] def setup(self, options=None, solver_options={}, solver='qpoases'):
"""
:param options:
:param solver_options:
:param :
:return:
"""
self._check_mpc_is_well_posed()
if not self._scaling_is_set:
self.set_scaling()
if not self._time_varying_parameters_is_set:
self.set_time_varying_parameters()
if not self._box_constraints_is_set:
self.set_box_constraints()
if not self._nlp_solver_is_set:
self.set_nlp_solver(solver)
if not self._sampling_time_is_set:
self.set_sampling_interval()
self._solver_options = solver_options
self._populate_solution()
self._x_lb_orig = deepcopy(self._x_lb)
self._x_ub_orig = deepcopy(self._x_ub)
self._u_lb_orig = deepcopy(self._u_lb)
self._u_ub_orig = deepcopy(self._u_ub)
# Scale problem
self._scale_problem()
# Predefine parameters (those are fixed and not optimized)
param_lmpc = ca.SX.sym("tv_p", self._n_tvp, self._horizon)
n_x = self._n_x
n_u = self._n_u
A = self._model.state_matrix
B = self._model.input_matrix
Q = self.Q
R = self.R
P = self.P
if Q is None:
Q = ca.DM.zeros(self._n_x, self._n_x)
if P is None:
P = ca.DM.zeros(self._n_x, self._n_x)
if R is None:
R = ca.DM.zeros(self._n_u, self._n_u)
dim_states = n_x * (self._horizon + 1)
dim_control = n_u * self._horizon
# Build Aeq
# aux1 = np.eye(self._horizon, self._horizon + 1)
# Abar1 = ca.kron(aux1, A)
if self._n_tvp > 0:
Abar1 = ca.substitute(A, self._model.p[self._time_varying_parameters_ind],
param_lmpc[:, 0])
for i in range(1, self._horizon):
Abar1 = ca.diagcat(Abar1, ca.substitute(A, self._model.p[self._time_varying_parameters_ind],
param_lmpc[:, i]))
else:
aux1 = np.eye(self._horizon, self._horizon)
Abar1 = ca.kron(aux1, A)
# Add a column of zero matrices
Abar1 = ca.horzcat(Abar1, ca.DM.zeros(self._horizon * self._n_x, self._n_x))
aux2 = np.zeros((self._horizon, self._horizon + 1))
# Save indices variables
u_ind = []
x_ind = []
offset_x = 0
x_ind.append([j for j in range(offset_x, offset_x + n_x)])
offset_x += n_x
offset_u = (self._horizon + 1) * n_x
for i in range(self._horizon):
aux2[i, i + 1] = -1
x_ind.append([j for j in range(offset_x, offset_x + n_x)])
u_ind.append([j for j in range(offset_u, offset_u + n_u)])
offset_x += n_x
offset_u += n_u
Abar2 = ca.kron(aux2, ca.DM.eye(n_x))
# aux3 = np.eye(self._horizon, self._horizon)
if self._n_tvp > 0:
Abar3 = ca.substitute(B, self._model.p[self._time_varying_parameters_ind],
param_lmpc[:, 0])
for i in range(1, self._horizon):
Abar3 = ca.diagcat(Abar3, ca.substitute(B, self._model.p[self._time_varying_parameters_ind],
param_lmpc[:, i]))
else:
aux3 = np.eye(self._horizon, self._horizon)
Abar3 = ca.kron(B, aux3)
# Add constraints for the ode
Aeq = ca.horzcat(Abar1 + Abar2, Abar3)
# generate parameters
beq = ca.kron(ca.DM.zeros(self._model.n_x), ca.DM.ones(self.horizon))
# Add constraints for the polytope constraints
# TODO, they should be added to A
H_states = ca.kron(ca.DM.eye(self._horizon), Q)
H_states = ca.diagcat(H_states, P)
H_control = ca.kron(ca.DM.eye(self._horizon), R)
H = ca.diagcat(H_states, H_control)
g = ca.DM.zeros((dim_states + dim_control, 1))
lb_states = ca.kron(ca.DM.ones((self._horizon + 1, 1)), self._x_lb)
ub_states = ca.kron(ca.DM.ones((self._horizon + 1, 1)), self._x_ub)
lb_control = ca.kron(ca.DM.ones((self._horizon, 1)), self._u_lb)
ub_control = ca.kron(ca.DM.ones((self._horizon, 1)), self._u_ub)
v_lb = ca.vertcat(lb_states, lb_control)
v_ub = ca.vertcat(ub_states, ub_control)
qp = {
'h': H.sparsity(),
'a': Aeq.sparsity(),
}
# TODO move this check of the solver into the setup_solver method
if self._solver_name in self._solver_name_list_qp:
# self._nlp_opts.update({'p': param_lmpc})
solver = ca.conic("solver", self._solver_name, qp, self._solver_options)
# x = ca.SX.sym('x', H.shape[0])
# qp = {'x':x, 'f': x.T@H@x, 'g':Aeq@x, 'p':param_npl_mpc }
# solver = ca.qpsol("solver", self._solver_name, qp)
elif self._solver_name == 'muaompc':
try:
from ..embedded.muaompc import setup_solver
except ImportError as err:
message = f"{err}."
message += "\nTo use nlp_solver='muaompc' first install muaompc."
message += " Try:\n pip install muaompc"
raise ImportError(message)
solver = setup_solver(self)
else:
raise ValueError(
f"The solver {self._solver_name} does no exist. The possible solver are {self._solver_name_list_qp}."
)
self._solver = solver
self._H = ca.DM(H)
self._g = g
self._Ad = Aeq
self._Ad_lb = beq
self._Ad_ub = beq
self._v_lb = v_lb
self._v_ub = v_ub
self._x_ind = x_ind
self._u_ind = u_ind
self._param_lmpc = param_lmpc
[docs] def optimize(self, x0, tvp=None, cp=None):
"""
:param x0:
:param tvp:
:param cp:
:return:
"""
# TODO: substitute all the values of all the variables remaining in the matrices
if self._solver_name == 'muaompc':
return self._solver(x0)
if self._model.n_p - self._n_tvp != 0:
if cp is not None:
cp = check_and_wrap_to_DM(cp)
if cp is None or cp.size1() != self._model.n_p - self._n_tvp:
raise ValueError(
f"The model has {self._model.n_p - self._n_tvp} constant parameter(s): "
f"{self._model.parameter_names}. You must pass me the value of these before running the "
f"optimization to the 'cp' parameter."
)
else:
if cp is not None:
warnings.warn(
"You are passing a parameter vector in the optimizer, but the model has no defined parameters. "
"I am ignoring the vector."
)
x0 = check_and_wrap_to_DM(x0)
if self._n_tvp > 0:
self._parse_tvp_parameters_values(tvp)
Ad = self._Ad
Ad_ub = self._Ad_ub
Ad_lb = self._Ad_lb
Ad = ca.substitute(Ad, self._model.dt, self._sampling_interval)
Ad_ub = ca.substitute(Ad_ub, self._model.dt, self._sampling_interval)
Ad_lb = ca.substitute(Ad_lb, self._model.dt, self._sampling_interval)
# TODO check that these points exists/have been provided by the user
# Substitute the equilibrium points
if self._model.is_linearized():
Ad = ca.substitute(Ad, self._model.x_eq, self._steady_state['x'])
Ad = ca.substitute(Ad, self._model.u_eq, self._steady_state['u'])
if cp is not None:
ind_cp_par = [i for i in range(self._model.n_p) if i not in self._time_varying_parameters_ind]
Ad = ca.substitute(Ad, self._model.p[ind_cp_par], cp)
Ad_ub = ca.substitute(Ad_ub, self._model.p[ind_cp_par], cp)
Ad_lb = ca.substitute(Ad_lb, self._model.p[ind_cp_par], cp)
self._v_lb[self._x_ind[0][0:self._n_x]] = x0 / ca.DM(self._x_scaling[0:self._n_x])
self._v_ub[self._x_ind[0][0:self._n_x]] = x0 / ca.DM(self._x_scaling[0:self._n_x])
if self._n_tvp:
for i in range(self._horizon):
Ad = ca.substitute(Ad, self._param_lmpc[:, i], self._time_varying_parameters_horizon[:, i])
Ad_ub = ca.substitute(Ad_ub, self._param_lmpc[:, i], self._time_varying_parameters_horizon[:, i])
Ad_lb = ca.substitute(Ad_lb, self._param_lmpc[:, i], self._time_varying_parameters_horizon[:, i])
Ad = ca.DM(Ad)
Ad_ub = ca.DM(Ad_ub)
Ad_lb = ca.DM(Ad_lb)
sol = self._solver(h=self._H, g=self._g, a=Ad, lbx=self._v_lb, ubx=self._v_ub, lba=Ad_lb, uba=Ad_ub)
self._nlp_solution = sol
u_opt = sol['x'][self._u_ind[0]] * np.array(self._u_scaling)
# Reset the old solution
self.reset_solution()
# Save predictions in the solution object. Done for plotting purposes.
self._save_predictions(cp, self._time_varying_parameters_horizon)
# Update the time clock - Useful for time-varying systems and references.
self._time += self.sampling_interval
# Save reference (is this case always zero)
self._save_references()
# Interation counter
self._n_iterations += 1
return u_opt
[docs] def set_stage_constraints(self, stage_constraint=None, lb=None, ub=None, is_soft=False, max_violation=ca.inf,
weight=None, name='stage_constraint'):
raise NotImplementedError(f"The method {self.set_stage_constraints.__name__} is not available for LMPC.")
[docs] def set_custom_constraints_function(self, fun=None, lb=None, ub=None, soft=False, max_violation=ca.inf):
raise NotImplementedError(
f"The method {self.set_custom_constraints_function.__name__} is not available for LMPC.")
[docs] def set_initial_guess(self, x_guess=None, u_guess=None, z_guess=None):
raise NotImplementedError(
f"The method {self.set_initial_guess.__name__} is not available for LMPC.")
[docs] def return_prediction(self):
"""
Returns the mpc prediction.
:return: x_pred, u_pred, t_pred
"""
if self._nlp_solution is not None:
x_pred = np.zeros((self._model.n_x, self._horizon + 1))
u_pred = np.zeros((self._model.n_u, self._control_horizon))
dt_pred = np.zeros(self._horizon)
for ii in range(self._horizon + 1):
x_pred[:, ii] = np.asarray(self._nlp_solution['x'][self._x_ind[ii]]).squeeze() * self._x_scaling
for ii in range(self._control_horizon):
u_pred[:, ii] = np.asarray(self._nlp_solution['x'][self._u_ind[ii]]).squeeze() * self._u_scaling
return x_pred, u_pred
else:
warnings.warn("There is still no mpc solution available. Run mpc.optimize() to get one.")
return None, None, None
@property
def prediction_horizon(self):
"""
:return:
"""
return self._horizon
@property
def P(self):
return self._P
@P.setter
def P(self, arg):
self._P = check_and_wrap_to_DM(arg)
@property
def Q(self):
return self._Q
@Q.setter
def Q(self, arg):
self._Q = check_and_wrap_to_DM(arg)
@property
def R(self):
return self._R
@R.setter
def R(self, arg):
self._R = check_and_wrap_to_DM(arg)
class SMPC(NMPC):
"""Class for Stochastic Nonlinear Model Predictive Control"""
def __init__(self, det_model, stoch_model, B, id=None, name=None, plot_backend=None, use_sx=True,
stats=False, Kgain=None):
# Save dimension of original (stochastic) model
self._n_x_s = det_model.n_x
self._n_u_s = det_model.n_u
self._n_y_s = det_model.n_y
self._n_p_s = det_model.n_p
self._n_z_s = det_model.n_z
if Kgain is not None:
Kgain = check_and_wrap_to_DM(Kgain)
self._Kgain_is_set = True
else:
self._Kgain_is_set = False
# First transfor the problem in a deterministic problem
model_c, Kx, Kgain = self._create_deterministic_surrogate(det_model, stoch_model, B, Kgain=Kgain)
self.Kx = Kx
self.Kgain = Kgain
model_c.setup(dt=1) # TODO put the dt from the solution
super().__init__(model_c, id=id, name=name, plot_backend=plot_backend, stats=stats, use_sx=use_sx)
# Initialize change constraint probability
self._x_ub_p = None
self._x_lb_p = None
self._u_ub_p = None
self._u_lb_p = None
self._y_ub_p = None
self._y_lb_p = None
self._z_ub_p = None
self._z_lb_p = None
# Initialize the original constraints
self.x_ub_s = ca.inf * ca.DM.ones(det_model.n_x)
self.x_lb_s = -ca.inf * ca.DM.ones(det_model.n_x)
self.u_ub_s = ca.inf * ca.DM.ones(det_model.n_u)
self.u_lb_s = -ca.inf * ca.DM.ones(det_model.n_u)
self.y_ub_s = ca.inf * ca.DM.ones(det_model.n_y)
self.y_lb_s = -ca.inf * ca.DM.ones(det_model.n_y)
self.z_ub_s = ca.inf * ca.DM.ones(det_model.n_z)
self.z_lb_s = -ca.inf * ca.DM.ones(det_model.n_z)
self._smpc_options = None
self._smpc_options_is_set = False
def _create_deterministic_surrogate(self, det_model, gps, Bw, Kgain=None):
"""
Create surrogate deterministic model
"""
model_c = Model(plot_backend='matplotlib', discrete=True)
mu_x = model_c.set_dynamical_states([f'{i}' for i in det_model.dynamical_state_names])
mu_u = model_c.set_inputs([f'{i}' for i in det_model.input_names])
mu_p = model_c.set_parameters(det_model.parameter_names)
if mu_p.shape == (0, 0):
mu_p.resize(0, 1)
if Kgain is None:
model_c.add_parameters([f'kgain_{i}' for i in range(det_model.n_x * det_model.n_u)])
Kgain = ca.reshape(model_c.p[det_model.n_p:], model_c.n_u, model_c.n_x)
jgps = []
mu_ds = []
Kd0s = []
if isinstance(gps, hilo_mpc.gp.GaussianProcess):
gps = [gps]
for gp in gps:
index = [det_model.dynamical_state_names.index(i) for i in gp.features]
xxx = det_model.x[index]
(y_pred_gp_ca, var_pred_gp_ca) = gp.predict(xxx)
# Compute jacobian of GP
jgp = ca.jacobian(y_pred_gp_ca, ca.vertcat(det_model.x, det_model.u))
jgp = ca.substitute(jgp, det_model.x, mu_x)
jgp = ca.substitute(jgp, det_model.u, mu_u)
jgp = ca.substitute(jgp, det_model.p, mu_p)
jgps.append(jgp)
mu_d = ca.substitute(y_pred_gp_ca, det_model.x, mu_x)
mu_d = ca.substitute(mu_d, det_model.u, mu_u)
mu_d = ca.substitute(mu_d, det_model.p, mu_p)
mu_ds.append(mu_d)
# Get the variance
Kd0 = ca.substitute(var_pred_gp_ca, det_model.x, mu_x)
Kd0 = ca.substitute(Kd0, det_model.u, mu_u)
Kd0 = ca.substitute(Kd0, det_model.p, mu_p)
Kd0s.append(Kd0)
mu_d = ca.vertcat(*mu_ds)
jgps = ca.vertcat(*jgps)
Kd0s = ca.diagcat(*Kd0s)
# Create new ode based on the means
ode = ca.substitute(det_model.ode, det_model.x, mu_x)
ode = ca.substitute(ode, det_model.u, mu_u)
ode = ca.substitute(ode, det_model.p, mu_p)
# Substitute the dt with the new model dt. They are the same thing but we need to do it becuse they need to be the same
# object
ode = ca.substitute(ode, det_model.dt, model_c.dt)
ode = ode + ca.mtimes(Bw, mu_d)
model_c.set_dynamical_equations(ode)
# Jacobian of the known part - necessary for uncertainty propagation
jode = ca.jacobian(det_model.ode, ca.vertcat(det_model.x, det_model.u))
# Substitute the means
jode = ca.substitute(jode, det_model.x, mu_x)
jode = ca.substitute(jode, det_model.u, mu_u)
jode = ca.substitute(jode, det_model.p, mu_p)
# Define variances and covariances of states inputs, unknown part and noise - x = f(x,u) + Bd(g(x,u)+w)
Kx = ca.SX.sym('kx', det_model.n_x, det_model.n_x)
# Add the state that describe the evolution of the states covariance matrix
model_c.add_dynamical_states(ca.reshape(Kx, det_model.n_x ** 2, 1))
Ku = ca.mtimes(ca.mtimes(Kgain, Kx), Kgain.T)
Kxu = ca.mtimes(Kx, Kgain.T)
Kux = Kxu.T
Kz = ca.vertcat(
ca.horzcat(Kx, Kxu),
ca.horzcat(Kux, Ku)
)
Kd = Kd0s + ca.mtimes(ca.mtimes(jgps, Kz), jgps.T)
Kzd = ca.mtimes(Kz, jgps.T)
# FIXME should I add a noise covariance matrix to Kd here?
bigK = ca.vertcat(
ca.horzcat(Kz, Kzd),
ca.horzcat(Kzd.T, Kd)
)
jodeBw = ca.horzcat(jode, Bw)
ode_c = ca.mtimes(jodeBw, ca.mtimes(bigK, jodeBw.T))
# Substitute the dt with the new model dt. They are the same thing but we need to do it becuse they need to be the same
# object
ode_c = ca.substitute(ode_c, det_model.dt, model_c.dt)
model_c.add_dynamical_equations(ca.reshape(ode_c, det_model.n_x ** 2, 1))
return model_c, Kx, Kgain
def _update_type(self) -> None:
"""
:return:
"""
self._type = 'SMPC'
def _get_chance_constraints(self):
# Note:
# I am taking the diagonal of Kx because I am assuming only box constrains. For different kind of constraints
# one should it id differenttly
if self._nlp_options['chance_constraints'] == 'prs':
if self._box_constraints_is_set:
if any([i != ca.inf for i in self._x_lb]) or any([i != ca.inf for i in self._x_ub]):
# Set state chance constraints
x_prob_ub = self._model.x[:self._n_x_s] + (
ca.sqrt(2) * erfinv(2 * np.array(self._x_ub_p) - 1)) * ca.sqrt(
ca.diag(self.Kx) + 1e-8)
x_prob_lb = -self._model.x[:self._n_x_s] + (
ca.sqrt(2) * erfinv(2 * np.array(self._x_lb_p) - 1)) * ca.sqrt(
ca.diag(self.Kx) + 1e-8)
self.stage_constraint.constraint = ca.vertcat(x_prob_ub, x_prob_lb)
self.stage_constraint.ub = ca.vertcat(ca.DM(self.x_ub_s), -ca.DM(self.x_lb_s))
self.stage_constraint.lb = -ca.inf * ca.DM.ones(2 * self._n_x_s)
self.terminal_constraint.constraint = ca.vertcat(x_prob_ub, x_prob_lb)
self.terminal_constraint.ub = ca.vertcat(ca.DM(self.x_ub_s), -ca.DM(self.x_lb_s))
self.terminal_constraint.lb = -ca.inf * ca.DM.ones(2 * self._n_x_s)
def _sanity_check_probability_values(self, var, type):
if var is None:
return var
else:
var = check_and_wrap_to_list(var)
for i in var:
if i < 0 or i > 1:
raise TypeError(
f"The probabilities must be between 0 and 1. The variable time {type} has some values"
f" ouside this range.")
return var
def set_box_constraints(self, x_ub=None, x_lb=None, u_ub=None, u_lb=None, y_ub=None, y_lb=None, z_ub=None,
z_lb=None, *args, **kwargs):
raise TypeError(
"set_box_constraints is not available in stochastic MPC. Use 'set_box_chance_constraints' instead.")
def set_box_chance_constraints(self, x_ub=None, x_lb=None, u_ub=None, u_lb=None, y_ub=None, y_lb=None, z_ub=None,
z_lb=None, *args, **kwargs):
# TODO: add method's documentation
"""
Set box constraints for the SMPC.
"""
# The equivalent deterministic case has n_x_s + n_x_s**2 number of states. So we need to expand the bounds provided
# by the user
# TODO: this can be simplified by looping over all bounds instead of writing everything again
# NOTE: once issue #3 is solved, the following lines should be not necessary anymore
if x_ub is not None:
self.x_ub_s = x_ub
var_x_ub = np.eye(self._n_x_s)
var_x_ub[var_x_ub == 0] = ca.inf
var_x_ub[var_x_ub == 1] = ca.inf
var_x_ub = var_x_ub.flatten().tolist()
x_ub = x_ub + var_x_ub
# Get probability of constraint satisfaction
self._x_ub_p = self._sanity_check_probability_values(kwargs.get('x_ub_p', np.ones(self._n_x_s) * 0.954), 'x_ub')
if x_lb is not None:
self.x_lb_s = x_lb
var_x_lb = np.eye(self._n_x_s)
var_x_lb[var_x_lb == 0] = -ca.inf
var_x_lb[var_x_lb == 1] = 0
var_x_lb = var_x_lb.flatten().tolist()
x_lb = x_lb + var_x_lb
# Get probability of constraint satisfaction
self._x_lb_p = self._sanity_check_probability_values(kwargs.get('x_lb_p', np.ones(self._n_x_s) * 0.954), 'x_lb')
if y_ub is not None:
self.y_ub_s = y_ub
var_y_ub = np.eye(self._n_x_s)
var_y_ub[var_y_ub == 0] = ca.inf
var_y_ub[var_y_ub == 1] = ca.inf
var_y_ub = var_y_ub.flatten().tolist()
y_ub = y_ub + var_y_ub
# Get probability of constraint satisfaction
self._y_ub_p = self._sanity_check_probability_values(kwargs.get('y_ub_p', np.ones(self._n_y_s) * 0.954), 'y_ub')
if y_lb is not None:
self.y_lb_s = y_lb
var_y_lb = np.eye(self._n_y_s)
var_y_lb[var_y_lb == 0] = -ca.inf
var_y_lb[var_y_lb == 1] = 0
var_y_lb = var_y_lb.flatten().tolist()
y_lb = y_lb + var_y_lb
# Get probability of constraint satisfaction
self._y_ub_p = self._sanity_check_probability_values(kwargs.get('y_lb_p', np.ones(self._n_y_s) * 0.954), 'y_lb')
if z_ub is not None:
self.z_ub_s = z_ub
var_z_ub = np.eye(self._n_z_s)
var_z_ub[var_z_ub == 0] = ca.inf
var_z_ub[var_z_ub == 1] = ca.inf
var_z_ub = var_z_ub.flatten().tolist()
z_ub = z_ub + var_z_ub
# Get probability of constraint satisfaction
self._z_ub_p = self._sanity_check_probability_values(kwargs.get('z_ub_p', np.ones(self._n_z_s) * 0.954), 'z_ub')
if z_lb is not None:
self.z_lb_s = z_lb
var_z_lb = np.eye(self._n_x_s)
var_z_lb[var_z_lb == 0] = -ca.inf
var_z_lb[var_z_lb == 1] = 0
var_z_lb = var_z_lb.flatten().tolist()
z_lb = z_lb + var_z_lb
# Get probability of constraint satisfaction
self._z_lb_p = self._sanity_check_probability_values(kwargs.get('z_lb_p', np.ones(self._n_z_s) * 0.954), 'z_lb')
# Note: the deterministic MPC problem takes the bounds also for the covariance elements since the model is
# expanded by the covariance elements
super(SMPC, self).set_box_constraints(x_ub=x_ub, x_lb=x_lb, u_ub=u_ub, u_lb=u_lb, y_ub=y_ub, y_lb=y_lb,
z_ub=z_ub,
z_lb=z_lb)
def set_custom_constraints_function(self, fun=None, lb=None, ub=None, soft=False, max_violation=ca.inf):
raise NotImplementedError(
f"{self.set_custom_constraints_function.__name__} is not yet implemented for {self._type} class. ")
def set_stage_constraints(self, stage_constraint=None, lb=None, ub=None, is_soft=False, max_violation=ca.inf,
weight=None, name='stage_constraint'):
raise NotImplementedError(
f"{self.set_stage_constraints.__name__} is not yet implemented for {self._type} class. ")
def set_terminal_constraints(self, stage_constraint=None, lb=None, ub=None, is_soft=False, max_violation=ca.inf,
weight=None, name='stage_constraint'):
raise NotImplementedError(
f"{self.set_terminal_constraints.__name__} is not yet implemented for {self._type} class. ")
def setup(self, options=None, solver_options=None) -> None:
self.set_nlp_options(options)
self._get_chance_constraints()
Kx = self.Kx[0:self._n_x_s, 0:self._n_x_s]
Ku = self.Kgain @ Kx @ self.Kgain.T
Q = self.quad_stage_cost.Q[0:self._n_x_s, 0:self._n_x_s]
R = self.quad_stage_cost.R
# Add covariance component in the objective function
self.stage_cost.cost = ca.trace(Q @ Kx) + ca.trace(R @ Ku)
# Setup equivalent deterministic problem
self._setup(options=options, solver_options=solver_options)
def optimize(self, x0, cp=None, tvp=None, v0=None, runs=0, fix_x0=True, **kwargs):
cov_x0 = kwargs.get('cov_x0', None)
if cov_x0 is None:
raise ValueError("To solve the SMPC you need to provide an intial condition for state covariance values. "
"Please pass a 'cov_x0' as well.")
if self._Kgain_is_set is False:
Kgain = kwargs.get('Kgain', None)
if Kgain is None:
raise ValueError("It looks like you have not passed the gain of the ancillary controller yet. "
"Please provide a 'Kgain' to the optimize method.")
else:
Kgain = check_and_wrap_to_DM(Kgain)
kgain = ca.reshape(Kgain, self._n_x_s * self._n_u_s, 1)
if cp is not None:
cp = ca.vertcat(cp, kgain)
else:
cp = kgain
x0 = check_and_wrap_to_DM(x0)
cov_x0 = check_and_wrap_to_DM(cov_x0)
cov_x0 = ca.reshape(cov_x0, self._n_x_s ** 2, 1)
x0 = ca.vertcat(x0, cov_x0)
super().optimize(x0, cp=cp, tvp=tvp, v0=v0, runs=runs, fix_x0=fix_x0, **kwargs)
def plot_prediction(self, save_plot=False, plot_dir=None, name_file='mpc_prediction.html', show_plot=True,
extras=None, extras_names=None, title=None, format_figure=None, **kwargs):
# I need to tell the NMPC how many stochastic states there are
kwargs['n_x_s'] = self._n_x_s
super().plot_prediction(save_plot=save_plot, plot_dir=plot_dir, name_file=name_file, show_plot=show_plot,
extras=extras, extras_names=extras_names, title=title, format_figure=format_figure,
**kwargs)
__all__ = [
'NMPC',
'LMPC'
]