# This file is part of HILO-MPC
# HILO-MPC is a toolbox for easy, flexible and fast development of machine-learning-supported
# optimal control and estimation problems
# Copyright (c) 2021 Johannes Pohlodek, Bruno Morabito, Rolf Findeisen
# All rights reserved
# HILO-MPC is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
# HILO-MPC is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with HILO-MPC. If not, see <http://www.gnu.org/licenses/>.
from typing import Optional, Sequence, Union
import casadi as ca
import numpy as np
from .base import Controller
from ..base import Base, TimeSeries
from ...util.util import convert, is_diagonal
Numeric = Union[int, float]
Array = Union[Sequence[Numeric], np.ndarray]
[docs]class PID(Controller, Base):
Class for PID controller
:param n_set_points:
:type n_set_points: int
:param id:
:type id: str, optional
:param name:
:type name: str, optional
:param k_p:
:type k_p:
:param t_i:
:type t_i:
:param t_d:
:type t_d:
:param proportional_on_process_value:
:type proportional_on_process_value: bool
:param derivative_on_process_value:
:type derivative_on_process_value: bool
:param plot_backend:
:type plot_backend: str, optional
def __init__(
n_set_points: int = 1,
id: Optional[str] = None,
name: Optional[str] = None,
k_p: Optional[Union[Numeric, Array]] = None,
t_i: Optional[Union[Numeric, Array]] = None,
t_d: Optional[Union[Numeric, Array]] = None,
proportional_on_process_value: bool = False,
derivative_on_process_value: bool = False,
plot_backend: Optional[str] = None
) -> None:
"""Constructor method"""
super().__init__(id=id, name=name)
if self._id is None:
self._n_set_points = n_set_points
if k_p is None:
k_p = self._n_set_points * [1.]
self._set_tuning_parameter('k_p', k_p)
if t_i is None:
t_i = self._n_set_points * [np.inf]
self._set_tuning_parameter('t_i', t_i)
if t_d is None:
t_d = self._n_set_points * [0.]
self._set_tuning_parameter('t_d', t_d)
self._proportional_on_process_value = proportional_on_process_value
self._derivative_on_process_value = derivative_on_process_value
self._p_band = False
self._anti_windup = None
self._set_point = ca.DM.zeros(self._n_set_points)
self._solution = TimeSeries(plot_backend, parent=self)
def _update_type(self) -> None:
self._type = 'PID'
def _set_tuning_parameter(self, which: str, value: Union[Numeric, Array]) -> None:
:param which:
:param value:
# TODO: Dimensions
value = convert(value, np.ndarray, shape=(self._n_set_points, self._n_set_points))
if not is_diagonal(value):
raise ValueError(f"The number of set points is greater than 1, but the supplied matrix for "
f"{dict({'k_p': 'K_P', 't_i': 'T_I', 't_d': 'T_D'})[which]} is not a diagonal matrix. "
f"Coupled multi-variable control is not supported at the moment.")
if which == 'k_p':
self._k_p = np.diag(value)
elif which == 't_i':
self._t_i = np.diag(value)
elif which == 't_d':
self._t_d = np.diag(value)
def _append_process_value(self, process_value: ca.DM) -> None:
:param process_value:
if self._solution.get_by_id('x').is_empty():
process_value_vector = ca.DM.zeros(self._n_set_points, 3)
process_value_vector = ca.reshape(self._solution.get_by_id('x:f'), self._n_set_points, 3)
process_value_vector[:, :-1] = process_value_vector[:, 1:]
process_value_vector[:, -1] = process_value
self._solution.add('x', process_value_vector.reshape((3 * self._n_set_points, -1)))
def _append_set_point(self) -> None:
if self._solution.get_by_id('p').is_empty():
set_point_vector = ca.DM.zeros(self._n_set_points, 3)
params = self._solution.get_by_id('p:f')
set_point_vector = ca.reshape(params[:3 * self._n_set_points], self._n_set_points, 3)
set_point_vector[:, :-1] = set_point_vector[:, 1:]
set_point_vector[:, -1] = self._set_point
params = ca.vertcat(set_point_vector.reshape((3 * self._n_set_points, -1)), self._k_p, self._t_i, self._t_d)
self._solution.add('p', params)
def _initialize_controller_output(self) -> None:
u = ca.DM.zeros(self._n_set_points)
self._solution.set('u', u)
def proportional_gain(self) -> np.ndarray:
return np.diag(self._k_p)
def proportional_gain(self, k_p: Union[Numeric, np.ndarray]) -> None:
self._set_tuning_parameter('k_p', k_p)
k_p = proportional_gain
def integral_time(self) -> np.ndarray:
return np.diag(self._t_i)
def integral_time(self, t_i: Union[Numeric, np.ndarray]) -> None:
self._set_tuning_parameter('t_i', t_i)
t_i = integral_time
def integral_gain(self) -> np.ndarray:
return np.diag(self._k_p / self._t_i)
k_i = integral_gain
def derivative_time(self) -> np.ndarray:
return np.diag(self._t_d)
def derivative_time(self, t_d: Union[Numeric, np.ndarray]) -> None:
self._set_tuning_parameter('t_d', t_d)
t_d = derivative_time
def derivative_gain(self) -> np.ndarray:
return np.diag(self._k_p * self._t_d)
k_d = derivative_gain
def set_point(self) -> ca.DM:
return self._set_point
def set_point(self, set_point: Union[Numeric, np.ndarray]) -> None:
set_point = convert(set_point, ca.DM)
if self._n_set_points > 1:
if set_point.size1() == 1:
set_point = ca.repmat(set_point, self._n_set_points, 1)
elif set_point.size1() != self._n_set_points:
raise ValueError(f"Dimension mismatch. Supplied dimension for the set point is "
f"{set_point.size1()}x{set_point.size2()}, but required dimension is "
self._set_point = set_point
s_p = set_point
[docs] def setup(self, **kwargs) -> None:
:param kwargs:
dt = kwargs.get('dt')
if dt is None:
dt = 1.
process_value = ca.SX.sym('pv', self._n_set_points, 3)
set_point = ca.SX.sym('sp', self._n_set_points, 3)
error = set_point - process_value
output = ca.SX.sym('u', self._n_set_points)
delta_output = 0.
# TODO: Dimensions
k_p = ca.SX.sym('k_p', self._n_set_points)
t_i = ca.SX.sym('t_i', self._n_set_points)
t_d = ca.SX.sym('t_d', self._n_set_points)
k_p_matrix = ca.diag(k_p)
t_i_matrix_inv = ca.diag(1. / t_i)
t_d_matrix = ca.diag(t_d)
if self._p_band:
raise NotImplementedError("The use of the proportional band has not been implemented yet.")
if self._proportional_on_process_value:
delta_output -= process_value[:, -1] - process_value[:, -2]
delta_output += error[:, -1] - error[:, -2]
delta_output += dt * t_i_matrix_inv @ error[:, -1]
# TODO: Add anti-windup
if self._derivative_on_process_value:
delta_output -= (t_d_matrix / dt) @ (process_value[:, -1] - 2 * process_value[:, -2] + process_value[:, -3])
delta_output += (t_d_matrix / dt) @ (error[:, -1] - 2 * error[:, -2] + error[:, -3])
# TODO: Add anti-windup
parameters = ca.vertcat(set_point.reshape((3 * self._n_set_points, -1)), k_p, t_i, t_d)
self._function = ca.Function(
[process_value.reshape((3 * self._n_set_points, -1)), ca.vertcat(output, parameters)],
[error, output + k_p_matrix @ delta_output],
['x0', 'p'],
['error', 'yf']
n_pv = process_value.numel()
n_y = output.numel()
n_sp = set_point.numel()
n_k_p = k_p.numel()
n_t_i = t_i.numel()
n_t_d = t_d.numel()
n_p = n_sp + n_k_p + n_t_i + n_t_d # same as parameters.numel()
names = ['dt', 'x', 'u', 'p'] # 'y'
vector = {
'dt': dt,
'x': {
'values_or_names': [pv.name() for pv in process_value.elements()],
'description': n_pv * [''],
'labels': n_pv * ['process value'],
'units': n_pv * [''],
'shape': (n_pv, 0),
'data_format': ca.DM
'u': {
'values_or_names': [out.name() for out in output.elements()],
'description': n_y * [''],
'labels': n_y * ['controller output'],
'units': n_y * [''],
'shape': (n_y, 0),
'data_format': ca.DM
'p': {
'values_or_names': [par.name() for par in parameters.elements()],
'description': n_p * [''],
'labels': n_sp * ['set point'] + n_k_p * ['K_P'] + n_t_i * ['T_I'] + n_t_d * ['T_D'],
'units': n_p * [''],
'shape': (n_p, 0),
'data_format': ca.DM
self._solution.setup(*names, **vector)
[docs] def call(self, *args, **kwargs):
:param args:
:param kwargs:
if self._function is None:
type_ = 'P'
if self._t_i != np.inf:
type_ += 'I'
if self._t_d != 0.:
type_ += 'D'
raise RuntimeError(f"{type_} controller is not set up. Run PID.setup() before calling the {type_} "
pv = kwargs.get('pv')
if pv is not None:
pv = convert(pv, ca.DM)
pv = ca.DM.zeros(self._n_set_points, 1)
if self._solution.get_by_id('u').is_empty():
args = self._solution.get_function_args()
result = self._function(**args)
return result['yf']
__all__ = [