#
# This file is part of HILO-MPC
#
# HILO-MPC is a toolbox for easy, flexible and fast development of machine-learning-supported
# optimal control and estimation problems
#
# Copyright (c) 2021 Johannes Pohlodek, Bruno Morabito, Rolf Findeisen
# All rights reserved
#
# HILO-MPC is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# HILO-MPC is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with HILO-MPC. If not, see <http://www.gnu.org/licenses/>.
#
from abc import ABCMeta, abstractmethod
from typing import Optional, Union
import warnings
import casadi as ca
import numpy as np
from .base import _Estimator
from ..dynamic_model.dynamic_model import Model
class _KalmanFilter(_Estimator, metaclass=ABCMeta):
"""
Kalman filter base class
:param model:
:param id: The identifier of the KF object. If no identifier is given, a random one will be generated.
:param name: The name of the KF object. By default the KF object has no name.
:param plot_backend: Plotting library that is used to visualize estimated data. At the moment only
`Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default, no plotting
library is selected, i.e. no plots can be generated.
:param square_root_form: Not used at the moment (will be implemented in the future)
"""
def __init__(
self,
model: Model,
id: Optional[str] = None,
name: Optional[str] = None,
plot_backend: Optional[str] = None,
square_root_form: bool = True
) -> None:
"""Constructor method"""
super().__init__(model, id=id, name=name, plot_backend=plot_backend)
self._square_root_form = square_root_form
self._predict_function = None
self._update_function = None
self._is_linearized = False
@abstractmethod
def _setup_parameters(self) -> None:
"""
:return:
"""
pass
def _setup_predict(self, *args: Optional[ca.Function]) -> None:
"""
:param args:
:return:
"""
state_matrix = args[0]
n_x = self._model.n_x
n_u = self._model.n_u
n_p = self._model.n_p
x = ca.MX.sym('x', n_x)
u = ca.MX.sym('u', n_u)
p = ca.MX.sym('p', n_p)
P = ca.SX.sym('P', (n_x, n_x))
Q = ca.SX.sym('Q', (n_x, n_x))
if self._is_linearized:
# NOTE: For now we ignore time-variant systems (i.e., [] instead of t)
F = state_matrix(self._model.x, [], self._model.u, self._model.p, self._solution.dt, [])
else:
# NOTE: For now we ignore time-variant systems (i.e., [] instead of t)
F = state_matrix(self._model.p, self._solution.dt, [])
if self._model.discrete:
Pdot = F @ P @ F.T + Q
else:
Pdot = F @ P + P @ F.T + Q
model = self._model.copy(setup=False)
model.add_dynamical_states(P[:])
model.add_dynamical_equations(Pdot[:])
model.add_parameters(Q[:])
if self._solution.dt is not None:
model.setup(dt=self._solution.dt)
elif self._solution.grid is not None:
model.setup(grid=self._solution.grid)
else:
model.setup()
self._solution.setup('P', P={
'values_or_names': [k.name() for k in P.elements()],
'description': P.numel() * [''],
'labels': P.numel() * [''],
'units': P.numel() * [''],
'shape': (P.numel(), 0),
'data_format': ca.DM
})
P = ca.MX.sym('P', P.shape)
Q = ca.MX.sym('Q', Q.shape)
sol = model(x0=ca.vertcat(x, P[:]), p=ca.vertcat(u, p, Q[:]))
result = sol['xf']
x_pred = result[:n_x]
P_pred = ca.reshape(result[n_x:], n_x, n_x)
self._predict_function = ca.Function('prediction_step',
[ca.horzcat(x, P), ca.vertcat(u, p), Q],
[ca.horzcat(x_pred, P_pred)],
['x0', 'p', 'Q'],
['x'])
def _setup_update(self, *args: Optional[ca.Function]) -> None:
"""
:param args:
:return:
"""
output_matrix = args[0]
n_x = self._model.n_x
n_y = self._model.n_y
n_u = self._model.n_u
n_p = self._model.n_p
x = ca.MX.sym('x', n_x)
u = ca.MX.sym('u', n_u)
p = ca.MX.sym('p', n_p)
up = ca.vertcat(u, p)
P = ca.MX.sym('P', (n_x, n_x))
if n_y == 0:
y_pred = x
n_y = n_x
else:
sol = self._model(x0=x, p=up, which='meas_function')
y_pred = sol['yf']
y = ca.MX.sym('y', n_y)
if self._is_linearized:
# NOTE: For now we ignore time-variant systems (i.e., [] instead of t)
H = output_matrix(x, [], u, p, self._solution.dt, [])
else:
# NOTE: For now we ignore time-variant systems (i.e., [] instead of t)
H = output_matrix(p, self._solution.dt, [])
R = ca.MX.sym('R', (n_y, n_y))
P_xy = P @ H.T
P_yy = H @ P @ H.T + R
# NOTE: Since matrices cannot really be divided (the "/"-operator divides matrices element-wise), a linear
# system of equations needs to be solved (solve() in CasADi, which corresponds to the "\"-operator in MATLAB)
# K = P_xy*P_yy^-1
# K*P_yy = P_xy
# P_yy^T*K^T = P_xy^T
# K^T = P_yy^T\P_xy^T
K = ca.solve(P_yy.T, P_xy.T).T
x_up = x + K @ (y - y_pred)
P_up = P - K @ P_yy @ K.T
self._update_function = ca.Function('update_step',
[ca.horzcat(x, P), y, up, R],
[ca.horzcat(x_up, P_up), y_pred],
['x0', 'y', 'p', 'R'],
['x', 'y_pred'])
def check_consistency(self) -> None:
"""
:return:
"""
super().check_consistency()
if self._predict_function is not None:
if self._predict_function.has_free():
free_vars = ", ".join(self._predict_function.get_free())
msg = f"Instance {self.__class__.__name__} has the following free variables/parameters: {free_vars}"
raise RuntimeError(msg)
if self._update_function is not None:
if self._update_function.has_free():
free_vars = ", ".join(self._update_function.get_free())
msg = f"Instance {self.__class__.__name__} has the following free variables/parameters: {free_vars}"
raise RuntimeError(msg)
def setup(self, **kwargs) -> None:
"""
:param kwargs:
:return:
"""
self._solution.setup(self._model.solution)
if not self._model.is_linear() and self.type == 'extended Kalman filter':
model = self._model.linearize()
self._is_linearized = True
else:
model = self._model.copy(setup=False)
self._is_linearized = False
n_y = model.n_y
if n_y == 0:
warnings.warn(f"The model has no measurement equations, I am assuming measurements of all states "
f"{self._model.dynamical_state_names} are available.")
model.set_measurement_equations(model.dynamical_states)
if self.type != 'unscented Kalman filter':
arg_in = [model.p, model.dt, model.t]
if self._is_linearized:
arg_in = [model.x_eq, model.z_eq, model.u_eq] + arg_in
state_matrix = ca.Function('state_matrix', arg_in, [model.A])
output_matrix = ca.Function('output_matrix', arg_in, [model.C])
predict_args = (state_matrix,)
update_args = (output_matrix,)
else:
predict_args = tuple()
update_args = tuple()
n_x = self._model.n_x
n_y = self._model.n_y
n_u = self._model.n_u
n_p = self._model.n_p
x = ca.MX.sym('x', n_x)
y = ca.MX.sym('y', n_y)
u = ca.MX.sym('u', n_u)
p = ca.MX.sym('p', n_p)
up = ca.vertcat(u, p)
P = ca.MX.sym('P', (n_x, n_x))
Q = ca.MX.sym('Q', (n_x, n_x))
R = ca.MX.sym('R', (n_y, n_y))
self._setup_parameters()
self._setup_update(*update_args)
self._setup_predict(*predict_args)
prediction = self._predict_function(ca.horzcat(x, P), up, Q)
update, y_pred = self._update_function(prediction, y, up, R)
self._function = ca.Function('function',
[ca.horzcat(x, P), y, up, Q, R],
[update, y_pred],
['x0', 'y', 'p', 'Q', 'R'],
['x', 'y_pred'])
self.check_consistency()
self._n_x = n_x
self._n_y = n_y
# self._n_z = n_z
self._n_u = n_u
self._n_p = n_p
# self._n_p_est = n_p_est
self._process_noise_covariance = ca.DM.zeros(Q.shape)
self._measurement_noise_covariance = ca.DM.zeros(R.shape)
def estimate(self, *args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
# TODO: Adjust according to Model.simulate
self._check_setup()
args = self._process_inputs(**kwargs)
tf = args.pop('t0')
steps = args.pop('steps')
# TODO: Maybe create a TimeSeries specifically tailored to estimation, where get_function_args(.) returns last
# error covariance as well
args['x0'] = ca.horzcat(args['x0'], ca.reshape(self._solution.get_by_id('P:f'), self._n_x, self._n_x))
if steps > 1:
Q = ca.repmat(self._process_noise_covariance, 1, steps)
R = ca.repmat(self._measurement_noise_covariance, 1, steps)
args['Q'] = Q
args['R'] = R
function = self._function.mapaccum(steps)
result = function(**args)
else:
args['Q'] = self._process_noise_covariance
args['R'] = self._measurement_noise_covariance
result = self._function(**args)
self._solution.update(t=tf, x=result['x'][:, 0], P=result['x'][:, 1:][:], y=result['y_pred'])
def predict(self, *args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
return self._predict_function(*args, **kwargs)
def update(self, *args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
return self._update_function(*args, **kwargs)
[docs]class KalmanFilter(_KalmanFilter):
"""
Kalman filter (KF) class for state estimation (parameter estimation will follow soon)
:param model:
:param id: The identifier of the KF object. If no identifier is given, a random one will be generated.
:param name: The name of the KF object. By default the KF object has no name.
:param plot_backend: Plotting library that is used to visualize estimated data. At the moment only
`Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default no plotting
library is selected, i.e. no plots can be generated.
:param square_root_form: Not used at the moment (will be implemented in the future)
"""
def __init__(
self,
model: Model,
id: Optional[str] = None,
name: Optional[str] = None,
plot_backend: Optional[str] = None,
square_root_form: bool = True
) -> None:
"""Constructor method"""
if not model.is_linear():
raise ValueError("The supplied model is nonlinear. Please use an estimator targeted at the estimation of "
"nonlinear systems.")
super().__init__(model, id=id, name=name, plot_backend=plot_backend, square_root_form=square_root_form)
def _update_type(self) -> None:
"""
:return:
"""
self._type = 'Kalman filter'
def _setup_parameters(self) -> None:
"""
:return:
"""
pass
[docs]class ExtendedKalmanFilter(_KalmanFilter):
"""
Extended Kalman filter (EKF) class for state estimation (parameter estimation will follow soon)
:param model:
:param id: The identifier of the EKF object. If no identifier is given, a random one will be generated.
:param name: The name of the EKF object. By default the EKF object has no name.
:param plot_backend: Plotting library that is used to visualize estimated data. At the moment only
`Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default no plotting
library is selected, i.e. no plots can be generated.
:param square_root_form: Not used at the moment (will be implemented in the future)
:note: The same methods and properties as for the :py:class:`Kalman filter <.KalmanFilter>` apply
"""
def __init__(
self,
model: Model,
id: Optional[str] = None,
name: Optional[str] = None,
plot_backend: Optional[str] = None,
square_root_form: bool = True
) -> None:
"""Constructor method"""
if model.is_linear():
warnings.warn("The supplied model is linear. For better efficiency use an observer targeted at the "
"estimation of linear systems.")
super().__init__(model, id=id, name=name, plot_backend=plot_backend, square_root_form=square_root_form)
def _update_type(self) -> None:
"""
:return:
"""
self._type = "extended Kalman filter"
def _setup_parameters(self) -> None:
"""
:return:
"""
pass
[docs]class UnscentedKalmanFilter(_KalmanFilter):
"""
Unscented Kalman filter (UKF) class for state estimation (parameter estimation will follow soon)
:param model:
:param id: The identifier of the UKF object. If no identifier is given, a random one will be generated.
:param name: The name of the UKF object. By default the UKF object has no name.
:param alpha:
:param beta:
:param kappa:
:param plot_backend: Plotting library that is used to visualize estimated data. At the moment only
`Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default no plotting
library is selected, i.e. no plots can be generated.
:param square_root_form: Not used at the moment (will be implemented in the future)
"""
def __init__(
self,
model: Model,
id: Optional[str] = None,
name: Optional[str] = None,
alpha: Union[int, float] = None,
beta: Union[int, float] = None,
kappa: Union[int, float] = None,
plot_backend: Optional[str] = None,
square_root_form: bool = True
) -> None:
"""Constructor method"""
if model.is_linear():
warnings.warn("The supplied model is linear. For better efficiency use an observer targeted at the "
"estimation of linear systems.")
super().__init__(model, id=id, name=name, plot_backend=plot_backend, square_root_form=square_root_form)
if alpha is None:
alpha = .001
self._check_parameter_bounds('alpha', alpha)
if beta is None:
beta = 2.
self._check_parameter_bounds('beta', beta)
if kappa is None:
kappa = 0.
self._check_parameter_bounds('kappa', kappa)
self._lambda = None
self._gamma = None
self._weights = None
self._sqrt = None
def _update_type(self) -> None:
"""
:return:
"""
self._type = "unscented Kalman filter"
def _check_parameter_bounds(self, param: str, value: Union[int, float]) -> None:
"""
:param param:
:param value:
:return:
"""
if param == 'alpha':
if value <= 0. or value > 1.:
raise ValueError(f"The parameter alpha needs to lie in the interval (0, 1]. Supplied alpha is {value}.")
self._alpha = value
elif param == 'beta':
self._beta = value
elif param == 'kappa':
if value < 0:
raise ValueError(f"The parameter kappa needs to be greater or equal to 0. Supplied kappa is {value}.")
self._kappa = value
def _setup_parameters(self) -> None:
"""
:return:
"""
n_x = self._model.n_x
self._lambda = self._alpha ** 2 * (n_x + self._kappa) - n_x
self._gamma = np.sqrt(n_x + self._lambda)
weights = np.zeros((2, 2 * n_x + 1))
weights[0, 0] = self._lambda / (n_x + self._lambda)
weights[1, 0] = self._lambda / (n_x + self._lambda) + 1 - self._alpha ** 2 + self._beta
weights[:, 1:] = 1 / (2 * (n_x + self._lambda))
self._weights = weights
a = ca.SX.sym('a', (n_x, n_x))
self._sqrt = ca.Function('sqrt', [a], [ca.chol(a)])
def _setup_predict(self, *args: Optional[ca.Function]) -> None:
"""
:param args:
:return:
"""
n_x = self._model.n_x
n_u = self._model.n_u
n_p = self._model.n_p
x = ca.MX.sym('x', n_x)
u = ca.MX.sym('u', n_u)
p = ca.MX.sym('p', n_p)
up = ca.vertcat(u, p)
P = ca.MX.sym('P', (n_x, n_x))
Q = ca.MX.sym('Q', (n_x, n_x))
P_sqrt = self._sqrt(P)
X = [x]
for k in range(n_x):
X.append(x + self._gamma * P_sqrt[:, k])
for k in range(n_x):
X.append(x - self._gamma * P_sqrt[:, k])
X = ca.horzcat(*X)
self._solution.setup('P', P={
'values_or_names': [P.name() + '_' + str(k) for k in range(P.numel())],
'description': P.numel() * [''],
'labels': P.numel() * [''],
'units': P.numel() * [''],
'shape': (P.numel(), 0),
'data_format': ca.DM
})
sol = self._model(x0=X, p=up)
X = sol['xf']
x_pred = ca.MX(0)
for k in range(2 * n_x + 1):
x_pred += self._weights[0, k] * X[:, k]
P_pred = Q
for k in range(2 * n_x + 1):
P_pred += self._weights[1, k] * (X[:, k] - x_pred) @ (X[:, k] - x_pred).T
self._predict_function = ca.Function('prediction_step',
[ca.horzcat(x, P), up, Q],
[ca.horzcat(x_pred, P_pred, X)],
['x0', 'p', 'Q'],
['x'])
def _setup_update(self, *args: Optional[ca.Function]) -> None:
"""
:param args:
:return:
"""
n_x = self._model.n_x
n_y = self._model.n_y
n_u = self._model.n_u
n_p = self._model.n_p
x = ca.MX.sym('x', n_x)
X = ca.MX.sym('X', n_x, 2 * n_x + 1)
y = ca.MX.sym('y', n_y)
u = ca.MX.sym('u', n_u)
p = ca.MX.sym('p', n_p)
up = ca.vertcat(u, p)
P = ca.MX.sym('P', (n_x, n_x))
if n_y == 0:
warnings.warn(f"The model has no measurement equations, I am assuming measurements of all states "
f"{self._model.dynamical_state_names} are available.")
Y = X
else:
sol = self._model(x0=X, p=up, which='meas_function')
Y = sol['yf']
y_pred = ca.MX(0)
for k in range(2 * n_x + 1):
y_pred += self._weights[0, k] * Y[:, k]
R = ca.MX.sym('R', (n_y, n_y))
P_xy = ca.MX(0)
P_yy = R
for k in range(2 * n_x + 1):
P_xy += self._weights[1, k] * (X[:, k] - x) @ (Y[:, k] - y_pred).T
P_yy += self._weights[1, k] * (Y[:, k] - y_pred) @ (Y[:, k] - y_pred).T
# NOTE: See NOTE in KalmanFilter.setup()
K = ca.solve(P_yy.T, P_xy.T).T
x_up = x + K @ (y - y_pred)
P_up = P - K @ P_yy @ K.T
self._update_function = ca.Function('update_step',
[ca.horzcat(x, P, X), y, up, R],
[ca.horzcat(x_up, P_up), y_pred],
['x0', 'y', 'p', 'R'],
['x', 'y_pred'])
@property
def alpha(self) -> float:
"""
:return:
"""
return float(self._alpha)
@alpha.setter
def alpha(self, arg: Union[int, float]) -> None:
self._check_parameter_bounds('alpha', arg)
@property
def beta(self) -> float:
"""
:return:
"""
return float(self._beta)
@beta.setter
def beta(self, arg: Union[int, float]) -> None:
self._check_parameter_bounds('beta', arg)
@property
def kappa(self) -> float:
"""
:return:
"""
return float(self._kappa)
@kappa.setter
def kappa(self, arg: Union[int, float]) -> None:
self._check_parameter_bounds('kappa', arg)
__all__ = [
'KalmanFilter',
'ExtendedKalmanFilter',
'UnscentedKalmanFilter'
]