# # This file is part of HILO-MPC## HILO-MPC is a toolbox for easy, flexible and fast development of machine-learning-supported# optimal control and estimation problems## Copyright (c) 2021 Johannes Pohlodek, Bruno Morabito, Rolf Findeisen# All rights reserved## HILO-MPC is free software: you can redistribute it and/or modify# it under the terms of the GNU Lesser General Public License as# published by the Free Software Foundation, either version 3# of the License, or (at your option) any later version.## HILO-MPC is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public License# along with HILO-MPC. If not, see <http://www.gnu.org/licenses/>.#fromabcimportABCMeta,abstractmethodfromtypingimportOptional,Unionimportwarningsimportcasadiascaimportnumpyasnpfrom.baseimport_Estimatorfrom..dynamic_model.dynamic_modelimportModelclass_KalmanFilter(_Estimator,metaclass=ABCMeta):""" Kalman filter base class :param model: :param id: The identifier of the KF object. If no identifier is given, a random one will be generated. :param name: The name of the KF object. By default the KF object has no name. :param plot_backend: Plotting library that is used to visualize estimated data. At the moment only `Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default, no plotting library is selected, i.e. no plots can be generated. :param square_root_form: Not used at the moment (will be implemented in the future) """def__init__(self,model:Model,id:Optional[str]=None,name:Optional[str]=None,plot_backend:Optional[str]=None,square_root_form:bool=True)->None:"""Constructor method"""super().__init__(model,id=id,name=name,plot_backend=plot_backend)self._square_root_form=square_root_formself._predict_function=Noneself._update_function=Noneself._is_linearized=False@abstractmethoddef_setup_parameters(self)->None:""" :return: """passdef_setup_predict(self,*args:Optional[ca.Function])->None:""" :param args: :return: """state_matrix=args[0]n_x=self._model.n_xn_u=self._model.n_un_p=self._model.n_px=ca.MX.sym('x',n_x)u=ca.MX.sym('u',n_u)p=ca.MX.sym('p',n_p)P=ca.SX.sym('P',(n_x,n_x))Q=ca.SX.sym('Q',(n_x,n_x))ifself._is_linearized:# NOTE: For now we ignore time-variant systems (i.e., [] instead of t)F=state_matrix(self._model.x,[],self._model.u,self._model.p,self._solution.dt,[])else:# NOTE: For now we ignore time-variant systems (i.e., [] instead of t)F=state_matrix(self._model.p,self._solution.dt,[])ifself._model.discrete:Pdot=F@P@F.T+Qelse:Pdot=F@P+P@F.T+Qmodel=self._model.copy(setup=False)model.add_dynamical_states(P[:])model.add_dynamical_equations(Pdot[:])model.add_parameters(Q[:])ifself._solution.dtisnotNone:model.setup(dt=self._solution.dt)elifself._solution.gridisnotNone:model.setup(grid=self._solution.grid)else:model.setup()self._solution.setup('P',P={'values_or_names':[k.name()forkinP.elements()],'description':P.numel()*[''],'labels':P.numel()*[''],'units':P.numel()*[''],'shape':(P.numel(),0),'data_format':ca.DM})P=ca.MX.sym('P',P.shape)Q=ca.MX.sym('Q',Q.shape)sol=model(x0=ca.vertcat(x,P[:]),p=ca.vertcat(u,p,Q[:]))result=sol['xf']x_pred=result[:n_x]P_pred=ca.reshape(result[n_x:],n_x,n_x)self._predict_function=ca.Function('prediction_step',[ca.horzcat(x,P),ca.vertcat(u,p),Q],[ca.horzcat(x_pred,P_pred)],['x0','p','Q'],['x'])def_setup_update(self,*args:Optional[ca.Function])->None:""" :param args: :return: """output_matrix=args[0]n_x=self._model.n_xn_y=self._model.n_yn_u=self._model.n_un_p=self._model.n_px=ca.MX.sym('x',n_x)u=ca.MX.sym('u',n_u)p=ca.MX.sym('p',n_p)up=ca.vertcat(u,p)P=ca.MX.sym('P',(n_x,n_x))ifn_y==0:y_pred=xn_y=n_xelse:sol=self._model(x0=x,p=up,which='meas_function')y_pred=sol['yf']y=ca.MX.sym('y',n_y)ifself._is_linearized:# NOTE: For now we ignore time-variant systems (i.e., [] instead of t)H=output_matrix(x,[],u,p,self._solution.dt,[])else:# NOTE: For now we ignore time-variant systems (i.e., [] instead of t)H=output_matrix(p,self._solution.dt,[])R=ca.MX.sym('R',(n_y,n_y))P_xy=P@H.TP_yy=H@P@H.T+R# NOTE: Since matrices cannot really be divided (the "/"-operator divides matrices element-wise), a linear# system of equations needs to be solved (solve() in CasADi, which corresponds to the "\"-operator in MATLAB)# K = P_xy*P_yy^-1# K*P_yy = P_xy# P_yy^T*K^T = P_xy^T# K^T = P_yy^T\P_xy^TK=ca.solve(P_yy.T,P_xy.T).Tx_up=x+K@(y-y_pred)P_up=P-K@P_yy@K.Tself._update_function=ca.Function('update_step',[ca.horzcat(x,P),y,up,R],[ca.horzcat(x_up,P_up),y_pred],['x0','y','p','R'],['x','y_pred'])defcheck_consistency(self)->None:""" :return: """super().check_consistency()ifself._predict_functionisnotNone:ifself._predict_function.has_free():free_vars=", ".join(self._predict_function.get_free())msg=f"Instance {self.__class__.__name__} has the following free variables/parameters: {free_vars}"raiseRuntimeError(msg)ifself._update_functionisnotNone:ifself._update_function.has_free():free_vars=", ".join(self._update_function.get_free())msg=f"Instance {self.__class__.__name__} has the following free variables/parameters: {free_vars}"raiseRuntimeError(msg)defsetup(self,**kwargs)->None:""" :param kwargs: :return: """self._solution.setup(self._model.solution)ifnotself._model.is_linear()andself.type=='extended Kalman filter':model=self._model.linearize()self._is_linearized=Trueelse:model=self._model.copy(setup=False)self._is_linearized=Falsen_y=model.n_yifn_y==0:warnings.warn(f"The model has no measurement equations, I am assuming measurements of all states "f"{self._model.dynamical_state_names} are available.")model.set_measurement_equations(model.dynamical_states)ifself.type!='unscented Kalman filter':arg_in=[model.p,model.dt,model.t]ifself._is_linearized:arg_in=[model.x_eq,model.z_eq,model.u_eq]+arg_instate_matrix=ca.Function('state_matrix',arg_in,[model.A])output_matrix=ca.Function('output_matrix',arg_in,[model.C])predict_args=(state_matrix,)update_args=(output_matrix,)else:predict_args=tuple()update_args=tuple()n_x=self._model.n_xn_y=self._model.n_yn_u=self._model.n_un_p=self._model.n_px=ca.MX.sym('x',n_x)y=ca.MX.sym('y',n_y)u=ca.MX.sym('u',n_u)p=ca.MX.sym('p',n_p)up=ca.vertcat(u,p)P=ca.MX.sym('P',(n_x,n_x))Q=ca.MX.sym('Q',(n_x,n_x))R=ca.MX.sym('R',(n_y,n_y))self._setup_parameters()self._setup_update(*update_args)self._setup_predict(*predict_args)prediction=self._predict_function(ca.horzcat(x,P),up,Q)update,y_pred=self._update_function(prediction,y,up,R)self._function=ca.Function('function',[ca.horzcat(x,P),y,up,Q,R],[update,y_pred],['x0','y','p','Q','R'],['x','y_pred'])self.check_consistency()self._n_x=n_xself._n_y=n_y# self._n_z = n_zself._n_u=n_uself._n_p=n_p# self._n_p_est = n_p_estself._process_noise_covariance=ca.DM.zeros(Q.shape)self._measurement_noise_covariance=ca.DM.zeros(R.shape)defestimate(self,*args,**kwargs):""" :param args: :param kwargs: :return: """# TODO: Adjust according to Model.simulateself._check_setup()args=self._process_inputs(**kwargs)tf=args.pop('t0')steps=args.pop('steps')# TODO: Maybe create a TimeSeries specifically tailored to estimation, where get_function_args(.) returns last# error covariance as wellargs['x0']=ca.horzcat(args['x0'],ca.reshape(self._solution.get_by_id('P:f'),self._n_x,self._n_x))ifsteps>1:Q=ca.repmat(self._process_noise_covariance,1,steps)R=ca.repmat(self._measurement_noise_covariance,1,steps)args['Q']=Qargs['R']=Rfunction=self._function.mapaccum(steps)result=function(**args)else:args['Q']=self._process_noise_covarianceargs['R']=self._measurement_noise_covarianceresult=self._function(**args)self._solution.update(t=tf,x=result['x'][:,0],P=result['x'][:,1:][:],y=result['y_pred'])defpredict(self,*args,**kwargs):""" :param args: :param kwargs: :return: """returnself._predict_function(*args,**kwargs)defupdate(self,*args,**kwargs):""" :param args: :param kwargs: :return: """returnself._update_function(*args,**kwargs)
[docs]classKalmanFilter(_KalmanFilter):""" Kalman filter (KF) class for state estimation (parameter estimation will follow soon) :param model: :param id: The identifier of the KF object. If no identifier is given, a random one will be generated. :param name: The name of the KF object. By default the KF object has no name. :param plot_backend: Plotting library that is used to visualize estimated data. At the moment only `Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default no plotting library is selected, i.e. no plots can be generated. :param square_root_form: Not used at the moment (will be implemented in the future) """def__init__(self,model:Model,id:Optional[str]=None,name:Optional[str]=None,plot_backend:Optional[str]=None,square_root_form:bool=True)->None:"""Constructor method"""ifnotmodel.is_linear():raiseValueError("The supplied model is nonlinear. Please use an estimator targeted at the estimation of ""nonlinear systems.")super().__init__(model,id=id,name=name,plot_backend=plot_backend,square_root_form=square_root_form)def_update_type(self)->None:""" :return: """self._type='Kalman filter'def_setup_parameters(self)->None:""" :return: """pass
[docs]classExtendedKalmanFilter(_KalmanFilter):""" Extended Kalman filter (EKF) class for state estimation (parameter estimation will follow soon) :param model: :param id: The identifier of the EKF object. If no identifier is given, a random one will be generated. :param name: The name of the EKF object. By default the EKF object has no name. :param plot_backend: Plotting library that is used to visualize estimated data. At the moment only `Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default no plotting library is selected, i.e. no plots can be generated. :param square_root_form: Not used at the moment (will be implemented in the future) :note: The same methods and properties as for the :py:class:`Kalman filter <.KalmanFilter>` apply """def__init__(self,model:Model,id:Optional[str]=None,name:Optional[str]=None,plot_backend:Optional[str]=None,square_root_form:bool=True)->None:"""Constructor method"""ifmodel.is_linear():warnings.warn("The supplied model is linear. For better efficiency use an observer targeted at the ""estimation of linear systems.")super().__init__(model,id=id,name=name,plot_backend=plot_backend,square_root_form=square_root_form)def_update_type(self)->None:""" :return: """self._type="extended Kalman filter"def_setup_parameters(self)->None:""" :return: """pass
[docs]classUnscentedKalmanFilter(_KalmanFilter):""" Unscented Kalman filter (UKF) class for state estimation (parameter estimation will follow soon) :param model: :param id: The identifier of the UKF object. If no identifier is given, a random one will be generated. :param name: The name of the UKF object. By default the UKF object has no name. :param alpha: :param beta: :param kappa: :param plot_backend: Plotting library that is used to visualize estimated data. At the moment only `Matplotlib <https://matplotlib.org/>`_ and `Bokeh <https://bokeh.org/>`_ are supported. By default no plotting library is selected, i.e. no plots can be generated. :param square_root_form: Not used at the moment (will be implemented in the future) """def__init__(self,model:Model,id:Optional[str]=None,name:Optional[str]=None,alpha:Union[int,float]=None,beta:Union[int,float]=None,kappa:Union[int,float]=None,plot_backend:Optional[str]=None,square_root_form:bool=True)->None:"""Constructor method"""ifmodel.is_linear():warnings.warn("The supplied model is linear. For better efficiency use an observer targeted at the ""estimation of linear systems.")super().__init__(model,id=id,name=name,plot_backend=plot_backend,square_root_form=square_root_form)ifalphaisNone:alpha=.001self._check_parameter_bounds('alpha',alpha)ifbetaisNone:beta=2.self._check_parameter_bounds('beta',beta)ifkappaisNone:kappa=0.self._check_parameter_bounds('kappa',kappa)self._lambda=Noneself._gamma=Noneself._weights=Noneself._sqrt=Nonedef_update_type(self)->None:""" :return: """self._type="unscented Kalman filter"def_check_parameter_bounds(self,param:str,value:Union[int,float])->None:""" :param param: :param value: :return: """ifparam=='alpha':ifvalue<=0.orvalue>1.:raiseValueError(f"The parameter alpha needs to lie in the interval (0, 1]. Supplied alpha is {value}.")self._alpha=valueelifparam=='beta':self._beta=valueelifparam=='kappa':ifvalue<0:raiseValueError(f"The parameter kappa needs to be greater or equal to 0. Supplied kappa is {value}.")self._kappa=valuedef_setup_parameters(self)->None:""" :return: """n_x=self._model.n_xself._lambda=self._alpha**2*(n_x+self._kappa)-n_xself._gamma=np.sqrt(n_x+self._lambda)weights=np.zeros((2,2*n_x+1))weights[0,0]=self._lambda/(n_x+self._lambda)weights[1,0]=self._lambda/(n_x+self._lambda)+1-self._alpha**2+self._betaweights[:,1:]=1/(2*(n_x+self._lambda))self._weights=weightsa=ca.SX.sym('a',(n_x,n_x))self._sqrt=ca.Function('sqrt',[a],[ca.chol(a)])def_setup_predict(self,*args:Optional[ca.Function])->None:""" :param args: :return: """n_x=self._model.n_xn_u=self._model.n_un_p=self._model.n_px=ca.MX.sym('x',n_x)u=ca.MX.sym('u',n_u)p=ca.MX.sym('p',n_p)up=ca.vertcat(u,p)P=ca.MX.sym('P',(n_x,n_x))Q=ca.MX.sym('Q',(n_x,n_x))P_sqrt=self._sqrt(P)X=[x]forkinrange(n_x):X.append(x+self._gamma*P_sqrt[:,k])forkinrange(n_x):X.append(x-self._gamma*P_sqrt[:,k])X=ca.horzcat(*X)self._solution.setup('P',P={'values_or_names':[P.name()+'_'+str(k)forkinrange(P.numel())],'description':P.numel()*[''],'labels':P.numel()*[''],'units':P.numel()*[''],'shape':(P.numel(),0),'data_format':ca.DM})sol=self._model(x0=X,p=up)X=sol['xf']x_pred=ca.MX(0)forkinrange(2*n_x+1):x_pred+=self._weights[0,k]*X[:,k]P_pred=Qforkinrange(2*n_x+1):P_pred+=self._weights[1,k]*(X[:,k]-x_pred)@(X[:,k]-x_pred).Tself._predict_function=ca.Function('prediction_step',[ca.horzcat(x,P),up,Q],[ca.horzcat(x_pred,P_pred,X)],['x0','p','Q'],['x'])def_setup_update(self,*args:Optional[ca.Function])->None:""" :param args: :return: """n_x=self._model.n_xn_y=self._model.n_yn_u=self._model.n_un_p=self._model.n_px=ca.MX.sym('x',n_x)X=ca.MX.sym('X',n_x,2*n_x+1)y=ca.MX.sym('y',n_y)u=ca.MX.sym('u',n_u)p=ca.MX.sym('p',n_p)up=ca.vertcat(u,p)P=ca.MX.sym('P',(n_x,n_x))ifn_y==0:warnings.warn(f"The model has no measurement equations, I am assuming measurements of all states "f"{self._model.dynamical_state_names} are available.")Y=Xelse:sol=self._model(x0=X,p=up,which='meas_function')Y=sol['yf']y_pred=ca.MX(0)forkinrange(2*n_x+1):y_pred+=self._weights[0,k]*Y[:,k]R=ca.MX.sym('R',(n_y,n_y))P_xy=ca.MX(0)P_yy=Rforkinrange(2*n_x+1):P_xy+=self._weights[1,k]*(X[:,k]-x)@(Y[:,k]-y_pred).TP_yy+=self._weights[1,k]*(Y[:,k]-y_pred)@(Y[:,k]-y_pred).T# NOTE: See NOTE in KalmanFilter.setup()K=ca.solve(P_yy.T,P_xy.T).Tx_up=x+K@(y-y_pred)P_up=P-K@P_yy@K.Tself._update_function=ca.Function('update_step',[ca.horzcat(x,P,X),y,up,R],[ca.horzcat(x_up,P_up),y_pred],['x0','y','p','R'],['x','y_pred'])@propertydefalpha(self)->float:""" :return: """returnfloat(self._alpha)@alpha.setterdefalpha(self,arg:Union[int,float])->None:self._check_parameter_bounds('alpha',arg)@propertydefbeta(self)->float:""" :return: """returnfloat(self._beta)@beta.setterdefbeta(self,arg:Union[int,float])->None:self._check_parameter_bounds('beta',arg)@propertydefkappa(self)->float:""" :return: """returnfloat(self._kappa)@kappa.setterdefkappa(self,arg:Union[int,float])->None:self._check_parameter_bounds('kappa',arg)