#
# This file is part of HILO-MPC
#
# HILO-MPC is a toolbox for easy, flexible and fast development of machine-learning-supported
# optimal control and estimation problems
#
# Copyright (c) 2021 Johannes Pohlodek, Bruno Morabito, Rolf Findeisen
# All rights reserved
#
# HILO-MPC is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# HILO-MPC is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with HILO-MPC. If not, see <http://www.gnu.org/licenses/>.
#
from typing import Any, Callable, Optional, TypeVar, Union
import warnings
import casadi as ca
import numpy as np
from ..base import LearningBase
from ....plugins.plugins import LearningManager, LearningVisualizationManager, check_version
from ....util.data import DataSet
from ....util.machine_learning import net_to_casadi_graph
from ....util.util import is_list_like
ML = TypeVar('ML', bound=LearningBase)
[docs]class ArtificialNeuralNetwork(LearningBase):
"""Artificial neural network class"""
def __init__(self, features, labels, id=None, name=None, **kwargs):
"""Constructor method"""
super().__init__(features, labels, id=id, name=name)
self._layers = []
self._weights = None
self._bias = None
self._dropouts = None
self._hidden = None
self._seed = kwargs.get('seed')
learning_rate = kwargs.get('learning_rate')
if learning_rate is None:
learning_rate = .001
self._learning_rate = learning_rate
loss = kwargs.get('loss')
if loss is None:
loss = 'mse'
self._loss = loss
optimizer = kwargs.get('optimizer')
if optimizer is None:
optimizer = 'adam'
self._optimizer = optimizer
metric = kwargs.get('metric')
if metric is None:
metric = []
self._metric = metric
self._data_sets = []
train_backend = kwargs.get('backend')
if train_backend is None:
train_backend = 'pytorch'
self._set_backend(train_backend)
self._net = None
self._check_data_sets()
self._scaler_x = None
self._scaler_y = None
self._train_data = (None, None)
self._validate_data = (None, None)
self._test_data = (None, None)
self._pandas_version_checked = False
@staticmethod
def _set_scaling(scaler: Union[str, Callable], backend: Optional[str] = None) -> Any:
"""
:param scaler:
:param backend:
:return:
"""
# TODO: Can we use something other than Any for typing the returned value?
if isinstance(scaler, str):
if backend is not None:
scaler = LearningManager(backend).setup(scaler)
else:
warnings.warn("Scaling was selected, but no scaler backend was supplied. Please select a scaler backend"
" or supply your own scaler object. No scaling applied.")
return None
if callable(scaler):
scaler = scaler()
has_fit = hasattr(scaler, 'fit') and callable(scaler.fit)
if not has_fit:
raise RuntimeError("Supplied scaler is missing the method 'fit'") # TODO: Return a little more info
has_transform = hasattr(scaler, 'transform') and callable(scaler.transform)
if not has_transform:
raise RuntimeError("Supplied scaler is missing the method 'transform'") # TODO: Return a little more info
return scaler
@staticmethod
def _check_scaler(scaler: Any) -> None:
"""
:param scaler:
:return:
"""
# TODO: See self._set_scaling (regarding Any)
if not hasattr(scaler, 'mean_'):
# TODO: Return a little more info
raise RuntimeError("Supplied scaler is missing the attribute 'mean_'")
if not hasattr(scaler, 'scale_'):
# TODO: Return a little more info
raise RuntimeError("Supplied scaler is missing the attribute 'scale_'")
def _check_data_sets(self, data_sets=None):
"""
:param data_sets:
:return:
"""
if data_sets is None:
data_sets = self._data_sets
if not is_list_like(data_sets):
data_sets = [data_sets]
for data_set in data_sets:
if isinstance(data_set, DataSet):
for feature in self._features:
if feature not in data_set.features:
raise ValueError(f"Feature {feature} does not exist in the supplied data set")
for label in self._labels:
if label not in data_set.labels:
raise ValueError(f"Label {label} does not exist in the supplied data set")
else:
# NOTE: Right now only pandas dataframes are supported
if not self._pandas_version_checked:
check_version('pandas')
self._pandas_version_checked = True
for feature in self._features:
if feature not in data_set.columns:
raise ValueError(f"Feature {feature} does not exist in the supplied data set")
for label in self._labels:
if label not in data_set.columns:
raise ValueError(f"Label {label} does not exist in the supplied data set")
@LearningBase.features.setter
def features(self, arg):
self._features = arg
self._n_features = len(arg)
self._check_data_sets()
@LearningBase.labels.setter
def labels(self, arg):
self._labels = arg
self._n_labels = len(arg)
self._check_data_sets()
@property
def seed(self):
"""
:return:
"""
return self._seed
@seed.setter
def seed(self, seed):
self._seed = seed
@property
def learning_rate(self):
"""
:return:
"""
return self._learning_rate
@learning_rate.setter
def learning_rate(self, lr):
self._learning_rate = lr
@property
def loss(self):
"""
:return:
"""
return self._loss
@loss.setter
def loss(self, loss):
self._loss = loss
@property
def optimizer(self):
"""
:return:
"""
return self._optimizer
@optimizer.setter
def optimizer(self, opti):
self._optimizer = opti
@property
def metric(self):
"""
:return:
"""
return self._metric
@metric.setter
def metric(self, metric):
self._metric = metric
@property
def depth(self):
"""
:return:
"""
return len([layer for layer in self._layers if layer.type.lower() != 'dropout'])
@property
def shape(self):
"""
:return:
"""
return (self._n_features,) + tuple(layer.nodes for layer in self._layers if layer.type.lower() != 'dropout') + (
self._n_labels,)
@property
def backend(self):
"""
:return:
"""
return self._backend.backend
@backend.setter
def backend(self, backend):
self._set_backend(backend)
[docs] def add_layers(self, layers):
"""
:param layers:
:return:
"""
if isinstance(layers, (list, tuple, set)):
for layer in layers:
self.add_layers(layer)
else:
self._layers.append(layers)
layers.parent = self
[docs] def add_data_set(self, data_set):
"""
:param data_set:
:return:
"""
self._check_data_sets(data_set)
self._data_sets.append(data_set)
[docs] def build_graph(self, weights=None, bias=None):
"""
:param weights:
:param bias:
:return:
"""
x = ca.SX.sym('x', self._n_features)
if weights is None and bias is None:
self._function = self._net.build_graph(x, self._layers, input_scaling=self._scaler_x,
output_scaling=self._scaler_y)
else:
self._function = net_to_casadi_graph({'weights': weights, 'bias': bias}, x, self._layers,
input_scaling=self._scaler_x, output_scaling=self._scaler_y)
[docs] def prepare_data_set(
self,
train_split: float = 1.,
validation_split: float = 0.,
scale_data: bool = False,
scaler: Optional[str] = None,
scaler_backend: Optional[str] = None,
shuffle: bool = True
) -> None:
"""
:param train_split:
:param validation_split:
:param scale_data:
:param scaler:
:param scaler_backend:
:param shuffle:
:return:
"""
data = self._data_sets[0].append(self._data_sets[1:], ignore_index=True, sort=False)
if isinstance(data, DataSet):
x_data, y_data = data.raw_data
# TODO: Make this the default in the future, so we don't have to transpose (therefore transpose for pandas
# objects)
x_data = x_data.T
y_data = y_data.T
# NOTE: I don't think at the moment that a length check is necessary here
else:
# NOTE: pandas is assumed here for now
x_data = data[self._features].values
y_data = data[self._labels].values
if len(x_data) != len(y_data):
raise ValueError(f"Dimension mismatch. Features have {len(x_data)} entries and labels have "
f"{len(y_data)} entries.")
if scale_data or scaler is not None or scaler_backend is not None:
if scaler is None:
scaler = 'StandardScaler'
self.set_scaling(scaler, backend=scaler_backend)
if self._scaler_x is not None:
self._scaler_x.fit(x_data)
self._check_scaler(self._scaler_x)
x_data = self._scaler_x.transform(x_data)
if self._scaler_y is not None:
self._scaler_y.fit(y_data)
self._check_scaler(self._scaler_y)
y_data = self._scaler_y.transform(y_data)
# TODO: Add support for data split directly in DataSet class
data_set_size = len(data)
indices = list(range(data_set_size))
if shuffle:
np.random.seed(self._seed)
np.random.shuffle(indices)
# NOTE: For odd number of data sets this can lead to different sizes although same split percentage was supplied
train, validate, test = np.split(indices, [int(train_split * data_set_size),
int((train_split + validation_split) * data_set_size)])
x_train = x_data[train, :]
y_train = y_data[train, :]
self._train_data = (x_train, y_train)
x_validate = x_data[validate, :]
y_validate = y_data[validate, :]
self._validate_data = (x_validate, y_validate)
x_test = x_data[test, :]
y_test = y_data[test, :]
self._test_data = (x_test, y_test)
[docs] def set_output_scaling(self, scaler: str, backend: Optional[str] = None) -> None:
"""
:param scaler:
:param backend:
:return:
"""
self._scaler_y = self._set_scaling(scaler, backend=backend)
[docs] def set_scaling(self, scaler: Union[str, Callable], backend: Optional[str] = None) -> None:
"""
:param scaler:
:param backend:
:return:
"""
if isinstance(scaler, str) and backend is not None:
scaler = LearningManager(backend).setup(scaler)
self.set_input_scaling(scaler)
self.set_output_scaling(scaler)
[docs] def setup(self, **kwargs):
"""
:param kwargs:
:return:
"""
loss = kwargs.get('loss')
if loss is None:
loss = self._loss
else:
self._loss = loss
optimizer = kwargs.get('optimizer')
if optimizer is None:
optimizer = self._optimizer
else:
self._optimizer = optimizer
metric = kwargs.get('metric')
if metric is None:
metric = self._metric
else:
self._metric = metric
show_tensorboard = kwargs.get('show_tensorboard')
if show_tensorboard is None:
show_tensorboard = False
save_tensorboard = kwargs.get('save_tensorboard')
if save_tensorboard is None:
save_tensorboard = False
else:
if show_tensorboard:
save_tensorboard = True
else:
save_tensorboard = kwargs.get('save_tensorboard')
if save_tensorboard is None:
save_tensorboard = False
tensorboard_log_dir = kwargs.get('tensorboard_log_dir')
if save_tensorboard:
tensorboard = LearningVisualizationManager('tensorboard').setup(log_dir=tensorboard_log_dir)
else:
tensorboard = None
if show_tensorboard:
browser = kwargs.get('browser')
if browser is None:
browser = 'chrome'
else:
browser = None
device = kwargs.get('device')
properties = [self._n_features, self._n_labels, self._layers]
options = {
'seed': self._seed,
'learning_rate': self._learning_rate,
'loss': loss,
'optimizer': optimizer,
'metric': metric,
'tensorboard': tensorboard,
'browser': browser,
'device': device
}
self._net = self._backend.setup('MLP', *properties, **options)
[docs] def is_setup(self):
"""
:return:
"""
if self._net is not None:
return True
else:
return False
[docs] def train(
self,
batch_size: int,
epochs: int,
verbose: int = 1,
validation_split: float = 0.,
test_split: float = 0.,
scale_data: bool = False,
scaler: Optional[str] = None,
scaler_backend: Optional[str] = None,
shuffle: bool = True,
patience: Optional[int] = None
) -> None:
"""
:param batch_size:
:param epochs:
:param verbose:
:param validation_split:
:param test_split:
:param scale_data:
:param scaler:
:param scaler_backend:
:param shuffle:
:param patience:
:return:
"""
if not self._data_sets:
raise RuntimeError("No data set to train on was supplied. Please add a data set by using the 'add_data_set'"
" method.")
if validation_split > 1. or validation_split < 0.:
raise ValueError("Validation split has to be between 0 and 1")
if test_split > 1. or test_split < 0.:
raise ValueError("Test split has to be between 0 and 1")
train_split = 1. - validation_split - test_split
if train_split <= 0.:
raise ValueError("Train split is not big enough. Reduce validation split or test split.")
self.prepare_data_set(train_split=train_split, validation_split=validation_split, scale_data=scale_data,
scaler=scaler, scaler_backend=scaler_backend, shuffle=shuffle)
self._net.train(self._train_data, self._validate_data, batch_size, epochs, verbose, patience, shuffle)
if all(data.size > 0 for data in self._test_data):
print("Evaluate on test data")
self._net.evaluate(data=self._test_data, batch_size=batch_size, verbose=verbose)
self.build_graph()
[docs] def is_trained(self):
"""
:return:
"""
return super().is_setup()
[docs] def predict(self, X_query):
"""
:param X_query:
:return:
"""
# TODO: We could also add a predict method to the wrapper and just link here, but this would not work with
# CasADi variables like this
return self._function(X_query)[0]
[docs] def show_tensorboard(self, browser=None):
"""
:param browser:
:return:
"""
if self.is_trained():
if browser is None:
browser = 'chrome'
self._net.show_tensorboard(browser.lower())
else:
warnings.warn("The artificial neural network has not been trained yet. Aborting...")
[docs] def close_tensorboard(self):
"""
:return:
"""
if self.is_trained():
self._net.close_tensorboard()
__all__ = [
'ArtificialNeuralNetwork'
]