from typing import Callable, Optional
import warnings
import casadi as ca
import numpy as np
from scipy.stats import norm
from .base import _Estimator
from ..dynamic_model.dynamic_model import Model
from ...util.util import convert
[docs]class ParticleFilter(_Estimator):
Particle filter (PF) class for state estimation
:param model:
:param id: The identifier of the PF object. If no identifier is given, a random one will be generated.
:param name: The name of the PF object. By default the PF object has no name.
:param plot_backend: Plotting library that is used to visualize estimated data. At the moment only
`Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default no plotting
library is selected, i.e. no plots can be generated.
:param variant:
:param roughening:
:param prior_editing:
:param kwargs:
def __init__(
model: Model,
id: Optional[str] = None,
name: Optional[str] = None,
plot_backend: Optional[str] = None,
variant: Optional[str] = None,
roughening: bool = False,
prior_editing: bool = False,
"""Constructor method"""
if model.is_linear():
warnings.warn("The supplied model is linear. For better efficiency use an observer targeted at the "
"estimation of linear systems.")
super().__init__(model, id=id, name=name, plot_backend=plot_backend)
self._variant = variant
self._roughening = roughening
self._prior_editing = prior_editing
if self._roughening or self._prior_editing:
K = kwargs.get('K')
if K is None:
K = .2
self._roughening_tuning_param = K
self._sample_size = 15
self._pdf = lhsnorm
self._transpose_pdf = None
def _update_type(self) -> None:
self._type = 'particle filter'
def _setup_normpdf(self):
x = ca.SX.sym('x')
mu = ca.SX.sym('mu')
sigma = ca.SX.sym('sigma')
y = ca.exp(-.5 * ((x - mu) / sigma) ** 2) / (ca.sqrt(2 * ca.pi) * sigma)
self._normpdf = ca.Function('normpdf', [x, mu, sigma], [y])
def _propagate_particles(self, n_samples):
:param n_samples:
n_x = self._model.n_x
n_y = self._model.n_y
n_u = self._model.n_u
n_p = self._model.n_p
X = ca.MX.sym('X', n_x, n_samples)
u = ca.MX.sym('u', n_u)
p = ca.MX.sym('p', n_p)
up = ca.vertcat(u, p)
self._solution.setup('X', X={
'values_or_names': [X.name() + '_' + str(k) for k in range(X.numel())],
'description': X.numel() * [''],
'labels': X.numel() * [''],
'units': X.numel() * [''],
'shape': (X.numel(), 0),
'data_format': ca.DM
sol = self._model(x0=X, p=up)
X_prop = sol['xf']
if n_y == 0:
warnings.warn(f"The model has no measurement equations, I am assuming measurements of all states "
f"{self._model.dynamical_state_names} are available.")
Y = X_prop
Y = sol['yf']
w = ca.MX.sym('w', n_x, n_samples)
v = ca.MX.sym('v', n_y, n_samples)
# NOTE: At the moment only additive noise is supported
X_prop += w
Y += v
self._predict_function = ca.Function('propagation_step',
[X, up, w, v],
[X_prop, Y],
['X', 'p', 'w', 'v'],
['X_prop', 'Y'])
def _evaluate_likelihood(self, n_samples):
n_y = self._model.n_y
y = ca.SX.sym('y', n_y)
Y = ca.SX.sym('Y', n_y, n_samples)
R = ca.SX.sym('R', (n_y, n_y))
q = self._normpdf(Y, y, ca.sqrt(R)) # Since R is usually a diagonal matrix, we can use ca.sqrt() here. We need
# to change this, if we have non-diagonal matrices, i.e. covariance entries.
q /= ca.sum2(q)
self._update_function = ca.Function('likelihood',
[y, Y, R],
['y', 'Y', 'R'],
def _initial_sample(self) -> None:
P0 = ca.reshape(self._solution.get_by_id('P:f'), self._n_x, self._n_x)
X = self._pdf(self._solution.get_by_id('x:0').full().flatten(), P0, self._sample_size)
if self._transpose_pdf:
X = X.T
elif self._transpose_pdf is None:
if X.shape != (2, self._sample_size):
X = X.T
if X.shape != (2, self._sample_size):
raise ValueError(f"Dimension mismatch. Expected dimension 2x{self._sample_size}, got "
self._transpose_pdf = True
self._transpose_pdf = False
X = convert(X, ca.DM)
self._solution.set('X', X[:])
def probability_density_function(self) -> Callable[[np.ndarray, np.ndarray, int], np.ndarray]:
return self._pdf
def probability_density_function(self, pdf: Callable[[np.ndarray, np.ndarray, int], np.ndarray]) -> None:
:param pdf:
if not callable(pdf):
raise ValueError(f"Probability density function of the {self.type} needs to be callable.")
annotations = pdf.__annotations__
run_function = True
if annotations:
if len(annotations) == 4 and 'return' in annotations:
types = [np.ndarray, np.ndarray, int, np.ndarray]
args = ["mean", "covariance", "sample size"]
for k, type_ in enumerate(annotations.values()):
if type_ is not types[k]:
if k == 0:
no = "1st"
elif k == 1:
no = "2nd"
no = "3rd"
if k < 3:
raise TypeError(f"The {no} argument to the probability density function (pdf) needs to be "
f"the '{args[k]}' with type {types[k].__name__}.")
raise TypeError(f"The return value of the probability density function (pdf) needs to be a"
f" 'random sample' with type {types[k].__name__}.")
run_function = False
if run_function:
X = pdf(np.zeros(self._n_x), np.eye(self._n_x), self._sample_size)
if X.shape != (self._n_x, self._sample_size):
if X.shape != (self._sample_size, self._n_x):
raise ValueError(f"Dimension mismatch. Expected dimension {self._n_x}x{self._sample_size}, got "
self._transpose_pdf = True
self._transpose_pdf = False
except Exception as err:
raise RuntimeError(f"The following exception was raised\n"
f" {type(err).__name__}: '{err.args[0]}'.\nPlease make sure that the "
f"supplied probability density function (pdf) has the following arguments\n"
f" mu - mean of the pdf (type: numpy.ndarray),\n"
f" sigma - covariance of the mean (type: numpy.ndarray),\n"
f" n - sample size (type: int),\n"
f"and the following return value\n"
f" X - random sample (type: numpy.ndarray).")
self._pdf = pdf
pdf = probability_density_function
def variant(self) -> str:
return self._variant
def variant(self, variant):
self._variant = variant
def sample_size(self):
return self._sample_size
def sample_size(self, sample_size):
self._sample_size = sample_size
n_samples = sample_size
[docs] def setup(self, **kwargs) -> None:
:param kwargs:
n_s = kwargs.get('n_samples')
if n_s is None:
n_s = self._sample_size
self._sample_size = n_s
n_x = self._model.n_x
n_y = self._model.n_y
n_u = self._model.n_u
n_p = self._model.n_p
n_P = n_x * n_x
self._solution.setup('P', P={
'values_or_names': ['P_' + str(k) for k in range(n_P)],
'description': n_P * [''],
'labels': n_P * [''],
'units': n_P * [''],
'shape': (n_P, 0),
'data_format': ca.DM
X = ca.MX.sym('X', n_x, n_s)
y = ca.MX.sym('y', n_y)
u = ca.MX.sym('u', n_u)
p = ca.MX.sym('p', n_p)
up = ca.vertcat(u, p)
w = ca.MX.sym('w', n_x, n_s)
v = ca.MX.sym('v', n_y, n_s)
Q = ca.MX.sym('Q', (n_x, n_x))
R = ca.MX.sym('R', (n_y, n_y))
prediction = self._predict_function(X=X, p=up, w=w, v=v)
X_prop = prediction['X_prop']
Y = prediction['Y']
update = self._update_function(y=y, Y=Y, R=R)
self._function = ca.Function('function',
[X, y, up, w, v, R],
[X_prop, Y, update['q']],
['X', 'y', 'p', 'w', 'v', 'R'],
['X_prop', 'Y', 'q'])
self._n_x = n_x
self._n_y = n_y
# self._n_z = n_z
self._n_u = n_u
self._n_p = n_p
# self._n_p_est = n_p_est
self._process_noise_covariance = ca.DM.zeros(Q.shape)
self._measurement_noise_covariance = ca.DM.zeros(R.shape)
[docs] def estimate(self, *args, **kwargs):
:param args:
:param kwargs:
# TODO: Adjust according to Model.simulate
kwargs['skip'] = ['x']
args = self._process_inputs(**kwargs)
tf = args.pop('t0')
steps = args.pop('steps')
if self._solution.get_by_id('X').is_empty():
args['X'] = ca.reshape(self._solution.get_by_id('X:f'), self._n_x, self._sample_size)
if steps > 1:
raise NotImplementedError("Particle filter is not yet implemented with steps > 1.")
R = self._measurement_noise_covariance
w = self._pdf(np.zeros(self._n_x), self._process_noise_covariance.full(), self._sample_size)
# v = self._pdf(np.zeros(self._n_y), self._measurement_noise_covariance.full(), self._sample_size)
v = ca.sqrt(R) @ np.random.randn(self._n_y, self._sample_size) # Only possible since R is usually a
# diagonal matrix (see self._evaluate_likelihood)
if self._transpose_pdf:
w = w.T
# v = v.T
args['w'] = w
args['v'] = v
args['R'] = R
result = self._function(**args)
Y = result['Y']
# Prior editing
if self._prior_editing:
mag = args['y'] - Y
need_roughening = ca.fabs(mag) > 6 * ca.sqrt(R)
n_r = int(ca.sum2(need_roughening))
while n_r > 0:
# TODO: Check out prior editing in more detail. Should we only use the X that are indexed by
# need_roughening for the calculation of dx? Do we roughen already roughened X, or do we just
# replace the ones that were improved by the roughening and keep the other ones at their original
# value?
dx = np.max(args['X'], axis=1) - np.min(args['X'], axis=1)
dx = self._pdf(np.zeros(self._n_x), self._roughening_tuning_param * np.diag(dx) *
n_r ** (-1 / self._n_x), n_r)
if self._transpose_pdf:
dx = dx.T
mask = np.where(need_roughening == 1)[1]
args['X'][:, mask] += dx
result = self._function(**args)
Y = result['Y']
mag = args['y'] - Y
need_roughening = ca.fabs(mag) > 6 * ca.sqrt(R)
n_r = int(ca.sum2(need_roughening))
X = result['X_prop']
q = result['q']
# Resample (Survival of the fittest)
ind = np.random.choice(self._sample_size, size=self._sample_size, replace=True, p=q.full().flatten())
X = X[:, ind]
Y = Y[:, ind]
# Roughening
if self._roughening:
dx = np.max(X, axis=1) - np.min(X, axis=1)
dx = self._pdf(np.zeros(self._n_x), self._roughening_tuning_param * np.diag(dx) *
self._sample_size ** (-1 / self._n_x), self._sample_size)
if self._transpose_pdf:
dx = dx.T
X += dx
x = ca.sum2(X) / self._sample_size
y = ca.sum2(Y) / self._sample_size
P = convert(np.cov(X), ca.DM)
self._solution.update(t=tf, x=x, X=X[:], P=P[:], y=y)
def lhsnorm(mu, sigma, n):
:param mu:
:param sigma:
:param n:
# TODO: Migrate to somewhere in util
n_m = mu.size
z = np.random.multivariate_normal(mu, sigma, size=n)
x = np.zeros_like(z, dtype=z.dtype)
idz = np.argsort(z, axis=0)
for k in range(n_m):
x[idz[:, k], k] = np.linspace(1, n, n)
x -= np.random.rand(*x.shape)
x /= n
for k in range(n_m):
x[:, k] = norm.ppf(x[:, k], loc=mu[k], scale=np.sqrt(sigma[k, k]))
return x
__all__ = [