diff --git a/examples/benchmarks/TFT/data_formatters/__init__.py b/examples/benchmarks/TFT/data_formatters/__init__.py new file mode 100644 index 000000000..9a1980462 --- /dev/null +++ b/examples/benchmarks/TFT/data_formatters/__init__.py @@ -0,0 +1,15 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/examples/benchmarks/TFT/data_formatters/base.py b/examples/benchmarks/TFT/data_formatters/base.py new file mode 100644 index 000000000..f4ce2764f --- /dev/null +++ b/examples/benchmarks/TFT/data_formatters/base.py @@ -0,0 +1,235 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Default data formatting functions for experiments. + +For new datasets, inherit form GenericDataFormatter and implement +all abstract functions. + +These dataset-specific methods: +1) Define the column and input types for tabular dataframes used by model +2) Perform the necessary input feature engineering & normalisation steps +3) Reverts the normalisation for predictions +4) Are responsible for train, validation and test splits + + +""" + +import abc +import enum + + +# Type defintions +class DataTypes(enum.IntEnum): + """Defines numerical types of each column.""" + REAL_VALUED = 0 + CATEGORICAL = 1 + DATE = 2 + + +class InputTypes(enum.IntEnum): + """Defines input types of each column.""" + TARGET = 0 + OBSERVED_INPUT = 1 + KNOWN_INPUT = 2 + STATIC_INPUT = 3 + ID = 4 # Single column used as an entity identifier + TIME = 5 # Single column exclusively used as a time index + + +class GenericDataFormatter(abc.ABC): + """Abstract base class for all data formatters. + + User can implement the abstract methods below to perform dataset-specific + manipulations. + + """ + + @abc.abstractmethod + def set_scalers(self, df): + """Calibrates scalers using the data supplied.""" + raise NotImplementedError() + + @abc.abstractmethod + def transform_inputs(self, df): + """Performs feature transformation.""" + raise NotImplementedError() + + @abc.abstractmethod + def format_predictions(self, df): + """Reverts any normalisation to give predictions in original scale.""" + raise NotImplementedError() + + @abc.abstractmethod + def split_data(self, df): + """Performs the default train, validation and test splits.""" + raise NotImplementedError() + + @property + @abc.abstractmethod + def _column_definition(self): + """Defines order, input type and data type of each column.""" + raise NotImplementedError() + + @abc.abstractmethod + def get_fixed_params(self): + """Defines the fixed parameters used by the model for training. + + Requires the following keys: + 'total_time_steps': Defines the total number of time steps used by TFT + 'num_encoder_steps': Determines length of LSTM encoder (i.e. history) + 'num_epochs': Maximum number of epochs for training + 'early_stopping_patience': Early stopping param for keras + 'multiprocessing_workers': # of cpus for data processing + + + Returns: + A dictionary of fixed parameters, e.g.: + + fixed_params = { + 'total_time_steps': 252 + 5, + 'num_encoder_steps': 252, + 'num_epochs': 100, + 'early_stopping_patience': 5, + 'multiprocessing_workers': 5, + } + """ + raise NotImplementedError + + # Shared functions across data-formatters + @property + def num_classes_per_cat_input(self): + """Returns number of categories per relevant input. + + This is seqeuently required for keras embedding layers. + """ + return self._num_classes_per_cat_input + + def get_num_samples_for_calibration(self): + """Gets the default number of training and validation samples. + + Use to sub-sample the data for network calibration and a value of -1 uses + all available samples. + + Returns: + Tuple of (training samples, validation samples) + """ + return -1, -1 + + def get_column_definition(self): + """"Returns formatted column definition in order expected by the TFT.""" + + column_definition = self._column_definition + + # Sanity checks first. + # Ensure only one ID and time column exist + def _check_single_column(input_type): + + length = len([tup for tup in column_definition if tup[2] == input_type]) + + if length != 1: + raise ValueError('Illegal number of inputs ({}) of type {}'.format( + length, input_type)) + + _check_single_column(InputTypes.ID) + _check_single_column(InputTypes.TIME) + + identifier = [tup for tup in column_definition if tup[2] == InputTypes.ID] + time = [tup for tup in column_definition if tup[2] == InputTypes.TIME] + real_inputs = [ + tup for tup in column_definition if tup[1] == DataTypes.REAL_VALUED and + tup[2] not in {InputTypes.ID, InputTypes.TIME} + ] + categorical_inputs = [ + tup for tup in column_definition if tup[1] == DataTypes.CATEGORICAL and + tup[2] not in {InputTypes.ID, InputTypes.TIME} + ] + + return identifier + time + real_inputs + categorical_inputs + + def _get_input_columns(self): + """Returns names of all input columns.""" + return [ + tup[0] + for tup in self.get_column_definition() + if tup[2] not in {InputTypes.ID, InputTypes.TIME} + ] + + def _get_tft_input_indices(self): + """Returns the relevant indexes and input sizes required by TFT.""" + + # Functions + def _extract_tuples_from_data_type(data_type, defn): + return [ + tup for tup in defn if tup[1] == data_type and + tup[2] not in {InputTypes.ID, InputTypes.TIME} + ] + + def _get_locations(input_types, defn): + return [i for i, tup in enumerate(defn) if tup[2] in input_types] + + # Start extraction + column_definition = [ + tup for tup in self.get_column_definition() + if tup[2] not in {InputTypes.ID, InputTypes.TIME} + ] + + categorical_inputs = _extract_tuples_from_data_type(DataTypes.CATEGORICAL, + column_definition) + real_inputs = _extract_tuples_from_data_type(DataTypes.REAL_VALUED, + column_definition) + + locations = { + 'input_size': + len(self._get_input_columns()), + 'output_size': + len(_get_locations({InputTypes.TARGET}, column_definition)), + 'category_counts': + self.num_classes_per_cat_input, + 'input_obs_loc': + _get_locations({InputTypes.TARGET}, column_definition), + 'static_input_loc': + _get_locations({InputTypes.STATIC_INPUT}, column_definition), + 'known_regular_inputs': + _get_locations({InputTypes.STATIC_INPUT, InputTypes.KNOWN_INPUT}, + real_inputs), + 'known_categorical_inputs': + _get_locations({InputTypes.STATIC_INPUT, InputTypes.KNOWN_INPUT}, + categorical_inputs), + } + + return locations + + def get_experiment_params(self): + """Returns fixed model parameters for experiments.""" + + required_keys = [ + 'total_time_steps', 'num_encoder_steps', 'num_epochs', + 'early_stopping_patience', 'multiprocessing_workers' + ] + + fixed_params = self.get_fixed_params() + + for k in required_keys: + if k not in fixed_params: + raise ValueError('Field {}'.format(k) + + ' missing from fixed parameter definitions!') + + fixed_params['column_definition'] = self.get_column_definition() + + fixed_params.update(self._get_tft_input_indices()) + + return fixed_params diff --git a/examples/benchmarks/TFT/data_formatters/electricity.py b/examples/benchmarks/TFT/data_formatters/electricity.py new file mode 100644 index 000000000..062a77eb2 --- /dev/null +++ b/examples/benchmarks/TFT/data_formatters/electricity.py @@ -0,0 +1,261 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Custom formatting functions for Electricity dataset. + +Defines dataset specific column definitions and data transformations. Uses +entity specific z-score normalization. +""" + +import data_formatters.base +import libs.utils as utils +import pandas as pd +import sklearn.preprocessing + +GenericDataFormatter = data_formatters.base.GenericDataFormatter +DataTypes = data_formatters.base.DataTypes +InputTypes = data_formatters.base.InputTypes + + +class ElectricityFormatter(GenericDataFormatter): + """Defines and formats data for the electricity dataset. + + Note that per-entity z-score normalization is used here, and is implemented + across functions. + + Attributes: + column_definition: Defines input and data type of column used in the + experiment. + identifiers: Entity identifiers used in experiments. + """ + + _column_definition = [ + ('id', DataTypes.REAL_VALUED, InputTypes.ID), + ('hours_from_start', DataTypes.REAL_VALUED, InputTypes.TIME), + ('power_usage', DataTypes.REAL_VALUED, InputTypes.TARGET), + ('hour', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('day_of_week', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('hours_from_start', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('categorical_id', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ] + + def __init__(self): + """Initialises formatter.""" + + self.identifiers = None + self._real_scalers = None + self._cat_scalers = None + self._target_scaler = None + self._num_classes_per_cat_input = None + self._time_steps = self.get_fixed_params()['total_time_steps'] + + def split_data(self, df, valid_boundary=1315, test_boundary=1339): + """Splits data frame into training-validation-test data frames. + + This also calibrates scaling object, and transforms data for each split. + + Args: + df: Source data frame to split. + valid_boundary: Starting year for validation data + test_boundary: Starting year for test data + + Returns: + Tuple of transformed (train, valid, test) data. + """ + + print('Formatting train-valid-test splits.') + + index = df['days_from_start'] + train = df.loc[index < valid_boundary] + valid = df.loc[(index >= valid_boundary - 7) & (index < test_boundary)] + test = df.loc[index >= test_boundary - 7] + + self.set_scalers(train) + + return (self.transform_inputs(data) for data in [train, valid, test]) + + def set_scalers(self, df): + """Calibrates scalers using the data supplied. + + Args: + df: Data to use to calibrate scalers. + """ + print('Setting scalers with training data...') + + column_definitions = self.get_column_definition() + id_column = utils.get_single_col_by_input_type(InputTypes.ID, + column_definitions) + target_column = utils.get_single_col_by_input_type(InputTypes.TARGET, + column_definitions) + + # Format real scalers + real_inputs = utils.extract_cols_from_data_type( + DataTypes.REAL_VALUED, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + # Initialise scaler caches + self._real_scalers = {} + self._target_scaler = {} + identifiers = [] + for identifier, sliced in df.groupby(id_column): + + if len(sliced) >= self._time_steps: + + data = sliced[real_inputs].values + targets = sliced[[target_column]].values + self._real_scalers[identifier] \ + = sklearn.preprocessing.StandardScaler().fit(data) + + self._target_scaler[identifier] \ + = sklearn.preprocessing.StandardScaler().fit(targets) + identifiers.append(identifier) + + # Format categorical scalers + categorical_inputs = utils.extract_cols_from_data_type( + DataTypes.CATEGORICAL, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + categorical_scalers = {} + num_classes = [] + for col in categorical_inputs: + # Set all to str so that we don't have mixed integer/string columns + srs = df[col].apply(str) + categorical_scalers[col] = sklearn.preprocessing.LabelEncoder().fit( + srs.values) + num_classes.append(srs.nunique()) + + # Set categorical scaler outputs + self._cat_scalers = categorical_scalers + self._num_classes_per_cat_input = num_classes + + # Extract identifiers in case required + self.identifiers = identifiers + + def transform_inputs(self, df): + """Performs feature transformations. + + This includes both feature engineering, preprocessing and normalisation. + + Args: + df: Data frame to transform. + + Returns: + Transformed data frame. + + """ + + if self._real_scalers is None and self._cat_scalers is None: + raise ValueError('Scalers have not been set!') + + # Extract relevant columns + column_definitions = self.get_column_definition() + id_col = utils.get_single_col_by_input_type(InputTypes.ID, + column_definitions) + real_inputs = utils.extract_cols_from_data_type( + DataTypes.REAL_VALUED, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + categorical_inputs = utils.extract_cols_from_data_type( + DataTypes.CATEGORICAL, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + # Transform real inputs per entity + df_list = [] + for identifier, sliced in df.groupby(id_col): + + # Filter out any trajectories that are too short + if len(sliced) >= self._time_steps: + sliced_copy = sliced.copy() + sliced_copy[real_inputs] = self._real_scalers[identifier].transform( + sliced_copy[real_inputs].values) + df_list.append(sliced_copy) + + output = pd.concat(df_list, axis=0) + + # Format categorical inputs + for col in categorical_inputs: + string_df = df[col].apply(str) + output[col] = self._cat_scalers[col].transform(string_df) + + return output + + def format_predictions(self, predictions): + """Reverts any normalisation to give predictions in original scale. + + Args: + predictions: Dataframe of model predictions. + + Returns: + Data frame of unnormalised predictions. + """ + + if self._target_scaler is None: + raise ValueError('Scalers have not been set!') + + column_names = predictions.columns + + df_list = [] + for identifier, sliced in predictions.groupby('identifier'): + sliced_copy = sliced.copy() + target_scaler = self._target_scaler[identifier] + + for col in column_names: + if col not in {'forecast_time', 'identifier'}: + sliced_copy[col] = target_scaler.inverse_transform(sliced_copy[col]) + df_list.append(sliced_copy) + + output = pd.concat(df_list, axis=0) + + return output + + # Default params + def get_fixed_params(self): + """Returns fixed model parameters for experiments.""" + + fixed_params = { + 'total_time_steps': 8 * 24, + 'num_encoder_steps': 7 * 24, + 'num_epochs': 100, + 'early_stopping_patience': 5, + 'multiprocessing_workers': 5 + } + + return fixed_params + + def get_default_model_params(self): + """Returns default optimised model parameters.""" + + model_params = { + 'dropout_rate': 0.1, + 'hidden_layer_size': 160, + 'learning_rate': 0.001, + 'minibatch_size': 64, + 'max_gradient_norm': 0.01, + 'num_heads': 4, + 'stack_size': 1 + } + + return model_params + + def get_num_samples_for_calibration(self): + """Gets the default number of training and validation samples. + + Use to sub-sample the data for network calibration and a value of -1 uses + all available samples. + + Returns: + Tuple of (training samples, validation samples) + """ + return 450000, 50000 diff --git a/examples/benchmarks/TFT/data_formatters/favorita.py b/examples/benchmarks/TFT/data_formatters/favorita.py new file mode 100644 index 000000000..26fae632c --- /dev/null +++ b/examples/benchmarks/TFT/data_formatters/favorita.py @@ -0,0 +1,327 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Custom formatting functions for Favorita dataset. + +Defines dataset specific column definitions and data transformations. +""" + +import data_formatters.base +import libs.utils as utils +import pandas as pd +import sklearn.preprocessing + +DataTypes = data_formatters.base.DataTypes +InputTypes = data_formatters.base.InputTypes + + +class FavoritaFormatter(data_formatters.base.GenericDataFormatter): + """Defines and formats data for the Favorita dataset. + + Attributes: + column_definition: Defines input and data type of column used in the + experiment. + identifiers: Entity identifiers used in experiments. + """ + + _column_definition = [ + ('traj_id', DataTypes.REAL_VALUED, InputTypes.ID), + ('date', DataTypes.DATE, InputTypes.TIME), + ('log_sales', DataTypes.REAL_VALUED, InputTypes.TARGET), + ('onpromotion', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('transactions', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('oil', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('day_of_week', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('day_of_month', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('month', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('national_hol', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('regional_hol', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('local_hol', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('open', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('item_nbr', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ('store_nbr', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ('city', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ('state', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ('type', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ('cluster', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ('family', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ('class', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ('perishable', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT) + ] + + def __init__(self): + """Initialises formatter.""" + + self.identifiers = None + self._real_scalers = None + self._cat_scalers = None + self._target_scaler = None + self._num_classes_per_cat_input = None + + def split_data(self, df, valid_boundary=None, test_boundary=None): + """Splits data frame into training-validation-test data frames. + + This also calibrates scaling object, and transforms data for each split. + + Args: + df: Source data frame to split. + valid_boundary: Starting year for validation data + test_boundary: Starting year for test data + + Returns: + Tuple of transformed (train, valid, test) data. + """ + + print('Formatting train-valid-test splits.') + + if valid_boundary is None: + valid_boundary = pd.datetime(2015, 12, 1) + + fixed_params = self.get_fixed_params() + time_steps = fixed_params['total_time_steps'] + lookback = fixed_params['num_encoder_steps'] + forecast_horizon = time_steps - lookback + + df['date'] = pd.to_datetime(df['date']) + df_lists = {'train': [], 'valid': [], 'test': []} + for _, sliced in df.groupby('traj_id'): + index = sliced['date'] + train = sliced.loc[index < valid_boundary] + train_len = len(train) + valid_len = train_len + forecast_horizon + valid = sliced.iloc[train_len - lookback:valid_len, :] + test = sliced.iloc[valid_len - lookback:valid_len + forecast_horizon, :] + + sliced_map = {'train': train, 'valid': valid, 'test': test} + + for k in sliced_map: + item = sliced_map[k] + + if len(item) >= time_steps: + df_lists[k].append(item) + + dfs = {k: pd.concat(df_lists[k], axis=0) for k in df_lists} + + train = dfs['train'] + self.set_scalers(train, set_real=True) + + # Use all data for label encoding to handle labels not present in training. + self.set_scalers(df, set_real=False) + + # Filter out identifiers not present in training (i.e. cold-started items). + def filter_ids(frame): + identifiers = set(self.identifiers) + index = frame['traj_id'] + return frame.loc[index.apply(lambda x: x in identifiers)] + + valid = filter_ids(dfs['valid']) + test = filter_ids(dfs['test']) + + return (self.transform_inputs(data) for data in [train, valid, test]) + + def set_scalers(self, df, set_real=True): + """Calibrates scalers using the data supplied. + + Label encoding is applied to the entire dataset (i.e. including test), + so that unseen labels can be handled at run-time. + + Args: + df: Data to use to calibrate scalers. + set_real: Whether to fit set real-valued or categorical scalers + """ + print('Setting scalers with training data...') + + column_definitions = self.get_column_definition() + id_column = utils.get_single_col_by_input_type(InputTypes.ID, + column_definitions) + target_column = utils.get_single_col_by_input_type(InputTypes.TARGET, + column_definitions) + + if set_real: + + # Extract identifiers in case required + self.identifiers = list(df[id_column].unique()) + + # Format real scalers + self._real_scalers = {} + for col in ['oil', 'transactions', 'log_sales']: + self._real_scalers[col] = (df[col].mean(), df[col].std()) + + self._target_scaler = (df[target_column].mean(), df[target_column].std()) + + else: + # Format categorical scalers + categorical_inputs = utils.extract_cols_from_data_type( + DataTypes.CATEGORICAL, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + categorical_scalers = {} + num_classes = [] + if self.identifiers is None: + raise ValueError('Scale real-valued inputs first!') + id_set = set(self.identifiers) + valid_idx = df['traj_id'].apply(lambda x: x in id_set) + for col in categorical_inputs: + # Set all to str so that we don't have mixed integer/string columns + srs = df[col].apply(str).loc[valid_idx] + categorical_scalers[col] = sklearn.preprocessing.LabelEncoder().fit( + srs.values) + + num_classes.append(srs.nunique()) + + # Set categorical scaler outputs + self._cat_scalers = categorical_scalers + self._num_classes_per_cat_input = num_classes + + def transform_inputs(self, df): + """Performs feature transformations. + + This includes both feature engineering, preprocessing and normalisation. + + Args: + df: Data frame to transform. + + Returns: + Transformed data frame. + + """ + output = df.copy() + + if self._real_scalers is None and self._cat_scalers is None: + raise ValueError('Scalers have not been set!') + + column_definitions = self.get_column_definition() + + categorical_inputs = utils.extract_cols_from_data_type( + DataTypes.CATEGORICAL, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + # Format real inputs + for col in ['log_sales', 'oil', 'transactions']: + mean, std = self._real_scalers[col] + output[col] = (df[col] - mean) / std + + if col == 'log_sales': + output[col] = output[col].fillna(0.) # mean imputation + + # Format categorical inputs + for col in categorical_inputs: + string_df = df[col].apply(str) + output[col] = self._cat_scalers[col].transform(string_df) + + return output + + def format_predictions(self, predictions): + """Reverts any normalisation to give predictions in original scale. + + Args: + predictions: Dataframe of model predictions. + + Returns: + Data frame of unnormalised predictions. + """ + output = predictions.copy() + + column_names = predictions.columns + mean, std = self._target_scaler + for col in column_names: + if col not in {'forecast_time', 'identifier'}: + output[col] = (predictions[col] * std) + mean + + return output + + # Default params + def get_fixed_params(self): + """Returns fixed model parameters for experiments.""" + + fixed_params = { + 'total_time_steps': 120, + 'num_encoder_steps': 90, + 'num_epochs': 100, + 'early_stopping_patience': 5, + 'multiprocessing_workers': 5 + } + + return fixed_params + + def get_default_model_params(self): + """Returns default optimised model parameters.""" + + model_params = { + 'dropout_rate': 0.1, + 'hidden_layer_size': 240, + 'learning_rate': 0.001, + 'minibatch_size': 128, + 'max_gradient_norm': 100., + 'num_heads': 4, + 'stack_size': 1 + } + + return model_params + + def get_num_samples_for_calibration(self): + """Gets the default number of training and validation samples. + + Use to sub-sample the data for network calibration and a value of -1 uses + all available samples. + + Returns: + Tuple of (training samples, validation samples) + """ + return 450000, 50000 + + def get_column_definition(self): + """"Formats column definition in order expected by the TFT. + + Modified for Favorita to match column order of original experiment. + + Returns: + Favorita-specific column definition + """ + + column_definition = self._column_definition + + # Sanity checks first. + # Ensure only one ID and time column exist + def _check_single_column(input_type): + + length = len([tup for tup in column_definition if tup[2] == input_type]) + + if length != 1: + raise ValueError('Illegal number of inputs ({}) of type {}'.format( + length, input_type)) + + _check_single_column(InputTypes.ID) + _check_single_column(InputTypes.TIME) + + identifier = [tup for tup in column_definition if tup[2] == InputTypes.ID] + time = [tup for tup in column_definition if tup[2] == InputTypes.TIME] + real_inputs = [ + tup for tup in column_definition if tup[1] == DataTypes.REAL_VALUED and + tup[2] not in {InputTypes.ID, InputTypes.TIME} + ] + + col_definition_map = {tup[0]: tup for tup in column_definition} + col_order = [ + 'item_nbr', 'store_nbr', 'city', 'state', 'type', 'cluster', 'family', + 'class', 'perishable', 'onpromotion', 'day_of_week', 'national_hol', + 'regional_hol', 'local_hol' + ] + categorical_inputs = [ + col_definition_map[k] for k in col_order if k in col_definition_map + ] + + return identifier + time + real_inputs + categorical_inputs diff --git a/examples/benchmarks/TFT/data_formatters/qlib_Alpha158.py b/examples/benchmarks/TFT/data_formatters/qlib_Alpha158.py new file mode 100644 index 000000000..aa081fb17 --- /dev/null +++ b/examples/benchmarks/TFT/data_formatters/qlib_Alpha158.py @@ -0,0 +1,220 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Custom formatting functions for Alpha158 dataset. + +Defines dataset specific column definitions and data transformations. +""" + +import data_formatters.base +import libs.utils as utils +import sklearn.preprocessing + +GenericDataFormatter = data_formatters.base.GenericDataFormatter +DataTypes = data_formatters.base.DataTypes +InputTypes = data_formatters.base.InputTypes + +class Alpha158Formatter(GenericDataFormatter): + """Defines and formats data for the Alpha158 dataset. + + Attributes: + column_definition: Defines input and data type of column used in the + experiment. + identifiers: Entity identifiers used in experiments. + """ + + _column_definition = [ + ('instrument', DataTypes.CATEGORICAL, InputTypes.ID), + ('LABEL0', DataTypes.REAL_VALUED, InputTypes.TARGET), + ('date', DataTypes.DATE, InputTypes.TIME), + ('month', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('day_of_week', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + # Selected 10 features + ('RESI5', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('WVMA5', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('RSQR5', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('KLEN', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('RSQR10', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('CORR5', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('CORD5', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('CORR10', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('ROC60', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('RESI10', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('const', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ] + + def __init__(self): + """Initialises formatter.""" + + self.identifiers = None + self._real_scalers = None + self._cat_scalers = None + self._target_scaler = None + self._num_classes_per_cat_input = None + + def split_data(self, df, valid_boundary=2016, test_boundary=2018): + """Splits data frame into training-validation-test data frames. + + This also calibrates scaling object, and transforms data for each split. + + Args: + df: Source data frame to split. + valid_boundary: Starting year for validation data + test_boundary: Starting year for test data + + Returns: + Tuple of transformed (train, valid, test) data. + """ + + print('Formatting train-valid-test splits.') + + index = df['year'] + train = df.loc[index < valid_boundary] + valid = df.loc[(index >= valid_boundary) & (index < test_boundary)] + test = df.loc[index >= test_boundary] + + self.set_scalers(train) + + return (self.transform_inputs(data) for data in [train, valid, test]) + + def set_scalers(self, df): + """Calibrates scalers using the data supplied. + + Args: + df: Data to use to calibrate scalers. + """ + print('Setting scalers with training data...') + + column_definitions = self.get_column_definition() + id_column = utils.get_single_col_by_input_type(InputTypes.ID, + column_definitions) + target_column = utils.get_single_col_by_input_type(InputTypes.TARGET, + column_definitions) + + # Extract identifiers in case required + self.identifiers = list(df[id_column].unique()) + + # Format real scalers + real_inputs = utils.extract_cols_from_data_type( + DataTypes.REAL_VALUED, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + data = df[real_inputs].values + self._real_scalers = sklearn.preprocessing.StandardScaler().fit(data) + self._target_scaler = sklearn.preprocessing.StandardScaler().fit( + df[[target_column]].values) # used for predictions + + # Format categorical scalers + categorical_inputs = utils.extract_cols_from_data_type( + DataTypes.CATEGORICAL, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + categorical_scalers = {} + num_classes = [] + for col in categorical_inputs: + # Set all to str so that we don't have mixed integer/string columns + srs = df[col].apply(str) + categorical_scalers[col] = sklearn.preprocessing.LabelEncoder().fit( + srs.values) + num_classes.append(srs.nunique()) + + # Set categorical scaler outputs + self._cat_scalers = categorical_scalers + self._num_classes_per_cat_input = num_classes + + def transform_inputs(self, df): + """Performs feature transformations. + + This includes both feature engineering, preprocessing and normalisation. + + Args: + df: Data frame to transform. + + Returns: + Transformed data frame. + + """ + output = df.copy() + + if self._real_scalers is None and self._cat_scalers is None: + raise ValueError('Scalers have not been set!') + + column_definitions = self.get_column_definition() + + real_inputs = utils.extract_cols_from_data_type( + DataTypes.REAL_VALUED, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + categorical_inputs = utils.extract_cols_from_data_type( + DataTypes.CATEGORICAL, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + # Format real inputs + output[real_inputs] = self._real_scalers.transform(df[real_inputs].values) + + # Format categorical inputs + for col in categorical_inputs: + string_df = df[col].apply(str) + output[col] = self._cat_scalers[col].transform(string_df) + + return output + + def format_predictions(self, predictions): + """Reverts any normalisation to give predictions in original scale. + + Args: + predictions: Dataframe of model predictions. + + Returns: + Data frame of unnormalised predictions. + """ + output = predictions.copy() + + column_names = predictions.columns + + for col in column_names: + if col not in {'forecast_time', 'identifier'}: + output[col] = self._target_scaler.inverse_transform(predictions[col]) + + return output + + # Default params + def get_fixed_params(self): + """Returns fixed model parameters for experiments.""" + + fixed_params = { + 'total_time_steps': 16 + 6, + 'num_encoder_steps': 16, + 'num_epochs': 100, + 'early_stopping_patience': 5, + 'multiprocessing_workers': 5, + } + + return fixed_params + + def get_default_model_params(self): + """Returns default optimised model parameters.""" + + model_params = { + 'dropout_rate': 0.3, + 'hidden_layer_size': 160, + 'learning_rate': 0.01, + 'minibatch_size': 64, + 'max_gradient_norm': 0.01, + 'num_heads': 1, + 'stack_size': 1 + } + + return model_params diff --git a/examples/benchmarks/TFT/data_formatters/traffic.py b/examples/benchmarks/TFT/data_formatters/traffic.py new file mode 100644 index 000000000..49401e5cc --- /dev/null +++ b/examples/benchmarks/TFT/data_formatters/traffic.py @@ -0,0 +1,117 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Custom formatting functions for Traffic dataset. + +Defines dataset specific column definitions and data transformations. This also +performs z-score normalization across the entire dataset, hence re-uses most of +the same functions as volatility. +""" + +import data_formatters.base +import data_formatters.volatility + +VolatilityFormatter = data_formatters.volatility.VolatilityFormatter +DataTypes = data_formatters.base.DataTypes +InputTypes = data_formatters.base.InputTypes + + +class TrafficFormatter(VolatilityFormatter): + """Defines and formats data for the traffic dataset. + + This also performs z-score normalization across the entire dataset, hence + re-uses most of the same functions as volatility. + + Attributes: + column_definition: Defines input and data type of column used in the + experiment. + identifiers: Entity identifiers used in experiments. + """ + + _column_definition = [ + ('id', DataTypes.REAL_VALUED, InputTypes.ID), + ('hours_from_start', DataTypes.REAL_VALUED, InputTypes.TIME), + ('values', DataTypes.REAL_VALUED, InputTypes.TARGET), + ('time_on_day', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('day_of_week', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('hours_from_start', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('categorical_id', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ] + + def split_data(self, df, valid_boundary=151, test_boundary=166): + """Splits data frame into training-validation-test data frames. + + This also calibrates scaling object, and transforms data for each split. + + Args: + df: Source data frame to split. + valid_boundary: Starting year for validation data + test_boundary: Starting year for test data + + Returns: + Tuple of transformed (train, valid, test) data. + """ + + print('Formatting train-valid-test splits.') + + index = df['sensor_day'] + train = df.loc[index < valid_boundary] + valid = df.loc[(index >= valid_boundary - 7) & (index < test_boundary)] + test = df.loc[index >= test_boundary - 7] + + self.set_scalers(train) + + return (self.transform_inputs(data) for data in [train, valid, test]) + + # Default params + def get_fixed_params(self): + """Returns fixed model parameters for experiments.""" + + fixed_params = { + 'total_time_steps': 8 * 24, + 'num_encoder_steps': 7 * 24, + 'num_epochs': 100, + 'early_stopping_patience': 5, + 'multiprocessing_workers': 5 + } + + return fixed_params + + def get_default_model_params(self): + """Returns default optimised model parameters.""" + + model_params = { + 'dropout_rate': 0.3, + 'hidden_layer_size': 320, + 'learning_rate': 0.001, + 'minibatch_size': 128, + 'max_gradient_norm': 100., + 'num_heads': 4, + 'stack_size': 1 + } + + return model_params + + def get_num_samples_for_calibration(self): + """Gets the default number of training and validation samples. + + Use to sub-sample the data for network calibration and a value of -1 uses + all available samples. + + Returns: + Tuple of (training samples, validation samples) + """ + return 450000, 50000 diff --git a/examples/benchmarks/TFT/data_formatters/volatility.py b/examples/benchmarks/TFT/data_formatters/volatility.py new file mode 100644 index 000000000..37923a275 --- /dev/null +++ b/examples/benchmarks/TFT/data_formatters/volatility.py @@ -0,0 +1,214 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Custom formatting functions for Volatility dataset. + +Defines dataset specific column definitions and data transformations. +""" + +import data_formatters.base +import libs.utils as utils +import sklearn.preprocessing + +GenericDataFormatter = data_formatters.base.GenericDataFormatter +DataTypes = data_formatters.base.DataTypes +InputTypes = data_formatters.base.InputTypes + + +class VolatilityFormatter(GenericDataFormatter): + """Defines and formats data for the volatility dataset. + + Attributes: + column_definition: Defines input and data type of column used in the + experiment. + identifiers: Entity identifiers used in experiments. + """ + + _column_definition = [ + ('Symbol', DataTypes.CATEGORICAL, InputTypes.ID), + ('date', DataTypes.DATE, InputTypes.TIME), + ('log_vol', DataTypes.REAL_VALUED, InputTypes.TARGET), + ('open_to_close', DataTypes.REAL_VALUED, InputTypes.OBSERVED_INPUT), + ('days_from_start', DataTypes.REAL_VALUED, InputTypes.KNOWN_INPUT), + ('day_of_week', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('day_of_month', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('week_of_year', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('month', DataTypes.CATEGORICAL, InputTypes.KNOWN_INPUT), + ('Region', DataTypes.CATEGORICAL, InputTypes.STATIC_INPUT), + ] + + def __init__(self): + """Initialises formatter.""" + + self.identifiers = None + self._real_scalers = None + self._cat_scalers = None + self._target_scaler = None + self._num_classes_per_cat_input = None + + def split_data(self, df, valid_boundary=2016, test_boundary=2018): + """Splits data frame into training-validation-test data frames. + + This also calibrates scaling object, and transforms data for each split. + + Args: + df: Source data frame to split. + valid_boundary: Starting year for validation data + test_boundary: Starting year for test data + + Returns: + Tuple of transformed (train, valid, test) data. + """ + + print('Formatting train-valid-test splits.') + + index = df['year'] + train = df.loc[index < valid_boundary] + valid = df.loc[(index >= valid_boundary) & (index < test_boundary)] + test = df.loc[index >= test_boundary] + + self.set_scalers(train) + + return (self.transform_inputs(data) for data in [train, valid, test]) + + def set_scalers(self, df): + """Calibrates scalers using the data supplied. + + Args: + df: Data to use to calibrate scalers. + """ + print('Setting scalers with training data...') + + column_definitions = self.get_column_definition() + id_column = utils.get_single_col_by_input_type(InputTypes.ID, + column_definitions) + target_column = utils.get_single_col_by_input_type(InputTypes.TARGET, + column_definitions) + + # Extract identifiers in case required + self.identifiers = list(df[id_column].unique()) + + # Format real scalers + real_inputs = utils.extract_cols_from_data_type( + DataTypes.REAL_VALUED, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + data = df[real_inputs].values + self._real_scalers = sklearn.preprocessing.StandardScaler().fit(data) + self._target_scaler = sklearn.preprocessing.StandardScaler().fit( + df[[target_column]].values) # used for predictions + + # Format categorical scalers + categorical_inputs = utils.extract_cols_from_data_type( + DataTypes.CATEGORICAL, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + categorical_scalers = {} + num_classes = [] + for col in categorical_inputs: + # Set all to str so that we don't have mixed integer/string columns + srs = df[col].apply(str) + categorical_scalers[col] = sklearn.preprocessing.LabelEncoder().fit( + srs.values) + num_classes.append(srs.nunique()) + + # Set categorical scaler outputs + self._cat_scalers = categorical_scalers + self._num_classes_per_cat_input = num_classes + + def transform_inputs(self, df): + """Performs feature transformations. + + This includes both feature engineering, preprocessing and normalisation. + + Args: + df: Data frame to transform. + + Returns: + Transformed data frame. + + """ + output = df.copy() + + if self._real_scalers is None and self._cat_scalers is None: + raise ValueError('Scalers have not been set!') + + column_definitions = self.get_column_definition() + + real_inputs = utils.extract_cols_from_data_type( + DataTypes.REAL_VALUED, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + categorical_inputs = utils.extract_cols_from_data_type( + DataTypes.CATEGORICAL, column_definitions, + {InputTypes.ID, InputTypes.TIME}) + + # Format real inputs + output[real_inputs] = self._real_scalers.transform(df[real_inputs].values) + + # Format categorical inputs + for col in categorical_inputs: + string_df = df[col].apply(str) + output[col] = self._cat_scalers[col].transform(string_df) + + return output + + def format_predictions(self, predictions): + """Reverts any normalisation to give predictions in original scale. + + Args: + predictions: Dataframe of model predictions. + + Returns: + Data frame of unnormalised predictions. + """ + output = predictions.copy() + + column_names = predictions.columns + + for col in column_names: + if col not in {'forecast_time', 'identifier'}: + output[col] = self._target_scaler.inverse_transform(predictions[col]) + + return output + + # Default params + def get_fixed_params(self): + """Returns fixed model parameters for experiments.""" + + fixed_params = { + 'total_time_steps': 252 + 5, + 'num_encoder_steps': 252, + 'num_epochs': 100, + 'early_stopping_patience': 5, + 'multiprocessing_workers': 5, + } + + return fixed_params + + def get_default_model_params(self): + """Returns default optimised model parameters.""" + + model_params = { + 'dropout_rate': 0.3, + 'hidden_layer_size': 160, + 'learning_rate': 0.01, + 'minibatch_size': 64, + 'max_gradient_norm': 0.01, + 'num_heads': 1, + 'stack_size': 1 + } + + return model_params diff --git a/examples/benchmarks/TFT/expt_settings/__init__.py b/examples/benchmarks/TFT/expt_settings/__init__.py new file mode 100644 index 000000000..9a1980462 --- /dev/null +++ b/examples/benchmarks/TFT/expt_settings/__init__.py @@ -0,0 +1,15 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/examples/benchmarks/TFT/expt_settings/configs.py b/examples/benchmarks/TFT/expt_settings/configs.py new file mode 100644 index 000000000..d28a39bb0 --- /dev/null +++ b/examples/benchmarks/TFT/expt_settings/configs.py @@ -0,0 +1,111 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Default configs for TFT experiments. + +Contains the default output paths for data, serialised models and predictions +for the main experiments used in the publication. +""" + +import os + +import data_formatters.electricity +import data_formatters.favorita +import data_formatters.traffic +import data_formatters.volatility +import data_formatters.qlib_Alpha158 + + +class ExperimentConfig(object): + """Defines experiment configs and paths to outputs. + + Attributes: + root_folder: Root folder to contain all experimental outputs. + experiment: Name of experiment to run. + data_folder: Folder to store data for experiment. + model_folder: Folder to store serialised models. + results_folder: Folder to store results. + data_csv_path: Path to primary data csv file used in experiment. + hyperparam_iterations: Default number of random search iterations for + experiment. + """ + + default_experiments = ['volatility', 'electricity', 'traffic', 'favorita', 'Alpha158'] + + def __init__(self, experiment='volatility', root_folder=None): + """Creates configs based on default experiment chosen. + + Args: + experiment: Name of experiment. + root_folder: Root folder to save all outputs of training. + """ + + if experiment not in self.default_experiments: + raise ValueError('Unrecognised experiment={}'.format(experiment)) + + # Defines all relevant paths + if root_folder is None: + root_folder = os.path.join( + os.path.dirname(os.path.realpath(__file__)), '..', 'outputs') + print('Using root folder {}'.format(root_folder)) + + self.root_folder = root_folder + self.experiment = experiment + self.data_folder = os.path.join(root_folder, 'data', experiment) + self.model_folder = os.path.join(root_folder, 'saved_models', experiment) + self.results_folder = os.path.join(root_folder, 'results', experiment) + + # Creates folders if they don't exist + for relevant_directory in [ + self.root_folder, self.data_folder, self.model_folder, + self.results_folder + ]: + if not os.path.exists(relevant_directory): + os.makedirs(relevant_directory) + + @property + def data_csv_path(self): + csv_map = { + 'volatility': 'formatted_omi_vol.csv', + 'electricity': 'hourly_electricity.csv', + 'traffic': 'hourly_data.csv', + 'favorita': 'favorita_consolidated.csv', + 'Alpha158': 'Alpha158.csv', + } + + return os.path.join(self.data_folder, csv_map[self.experiment]) + + @property + def hyperparam_iterations(self): + + return 240 if self.experiment == 'volatility' else 60 + + def make_data_formatter(self): + """Gets a data formatter object for experiment. + + Returns: + Default DataFormatter per experiment. + """ + + data_formatter_class = { + 'volatility': data_formatters.volatility.VolatilityFormatter, + 'electricity': data_formatters.electricity.ElectricityFormatter, + 'traffic': data_formatters.traffic.TrafficFormatter, + 'favorita': data_formatters.favorita.FavoritaFormatter, + 'Alpha158': data_formatters.qlib_Alpha158.Alpha158Formatter, + } + + return data_formatter_class[self.experiment]() diff --git a/examples/benchmarks/TFT/libs/__init__.py b/examples/benchmarks/TFT/libs/__init__.py new file mode 100644 index 000000000..9a1980462 --- /dev/null +++ b/examples/benchmarks/TFT/libs/__init__.py @@ -0,0 +1,15 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/examples/benchmarks/TFT/libs/hyperparam_opt.py b/examples/benchmarks/TFT/libs/hyperparam_opt.py new file mode 100644 index 000000000..c9bc19e7c --- /dev/null +++ b/examples/benchmarks/TFT/libs/hyperparam_opt.py @@ -0,0 +1,438 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Classes used for hyperparameter optimisation. + +Two main classes exist: +1) HyperparamOptManager used for optimisation on a single machine/GPU. +2) DistributedHyperparamOptManager for multiple GPUs on different machines. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections +import os +import shutil +import libs.utils as utils +import numpy as np +import pandas as pd + +Deque = collections.deque + + +class HyperparamOptManager: + """Manages hyperparameter optimisation using random search for a single GPU. + + Attributes: + param_ranges: Discrete hyperparameter range for random search. + results: Dataframe of validation results. + fixed_params: Fixed model parameters per experiment. + saved_params: Dataframe of parameters trained. + best_score: Minimum validation loss observed thus far. + optimal_name: Key to best configuration. + hyperparam_folder: Where to save optimisation outputs. + """ + + def __init__(self, + param_ranges, + fixed_params, + model_folder, + override_w_fixed_params=True): + """Instantiates model. + + Args: + param_ranges: Discrete hyperparameter range for random search. + fixed_params: Fixed model parameters per experiment. + model_folder: Folder to store optimisation artifacts. + override_w_fixed_params: Whether to override serialsed fixed model + parameters with new supplied values. + """ + + self.param_ranges = param_ranges + + self._max_tries = 1000 + self.results = pd.DataFrame() + self.fixed_params = fixed_params + self.saved_params = pd.DataFrame() + + self.best_score = np.Inf + self.optimal_name = "" + + # Setup + # Create folder for saving if its not there + self.hyperparam_folder = model_folder + utils.create_folder_if_not_exist(self.hyperparam_folder) + + self._override_w_fixed_params = override_w_fixed_params + + def load_results(self): + """Loads results from previous hyperparameter optimisation. + + Returns: + A boolean indicating if previous results can be loaded. + """ + print("Loading results from", self.hyperparam_folder) + + results_file = os.path.join(self.hyperparam_folder, "results.csv") + params_file = os.path.join(self.hyperparam_folder, "params.csv") + + if os.path.exists(results_file) and os.path.exists(params_file): + + self.results = pd.read_csv(results_file, index_col=0) + self.saved_params = pd.read_csv(params_file, index_col=0) + + if not self.results.empty: + self.results.at["loss"] = self.results.loc["loss"].apply(float) + self.best_score = self.results.loc["loss"].min() + + is_optimal = self.results.loc["loss"] == self.best_score + self.optimal_name = self.results.T[is_optimal].index[0] + + return True + + return False + + def _get_params_from_name(self, name): + """Returns previously saved parameters given a key.""" + params = self.saved_params + + selected_params = dict(params[name]) + + if self._override_w_fixed_params: + for k in self.fixed_params: + selected_params[k] = self.fixed_params[k] + + return selected_params + + def get_best_params(self): + """Returns the optimal hyperparameters thus far.""" + + optimal_name = self.optimal_name + + return self._get_params_from_name(optimal_name) + + def clear(self): + """Clears all previous results and saved parameters.""" + shutil.rmtree(self.hyperparam_folder) + os.makedirs(self.hyperparam_folder) + self.results = pd.DataFrame() + self.saved_params = pd.DataFrame() + + def _check_params(self, params): + """Checks that parameter map is properly defined.""" + + valid_fields = list(self.param_ranges.keys()) + list( + self.fixed_params.keys()) + invalid_fields = [k for k in params if k not in valid_fields] + missing_fields = [k for k in valid_fields if k not in params] + + if invalid_fields: + raise ValueError("Invalid Fields Found {} - Valid ones are {}".format( + invalid_fields, valid_fields)) + if missing_fields: + raise ValueError("Missing Fields Found {} - Valid ones are {}".format( + missing_fields, valid_fields)) + + def _get_name(self, params): + """Returns a unique key for the supplied set of params.""" + + self._check_params(params) + + fields = list(params.keys()) + fields.sort() + + return "_".join([str(params[k]) for k in fields]) + + def get_next_parameters(self, ranges_to_skip=None): + """Returns the next set of parameters to optimise. + + Args: + ranges_to_skip: Explicitly defines a set of keys to skip. + """ + if ranges_to_skip is None: + ranges_to_skip = set(self.results.index) + + if not isinstance(self.param_ranges, dict): + raise ValueError("Only works for random search!") + + param_range_keys = list(self.param_ranges.keys()) + param_range_keys.sort() + + def _get_next(): + """Returns next hyperparameter set per try.""" + + parameters = { + k: np.random.choice(self.param_ranges[k]) for k in param_range_keys + } + + # Adds fixed params + for k in self.fixed_params: + parameters[k] = self.fixed_params[k] + + return parameters + + for _ in range(self._max_tries): + + parameters = _get_next() + name = self._get_name(parameters) + + if name not in ranges_to_skip: + return parameters + + raise ValueError("Exceeded max number of hyperparameter searches!!") + + def update_score(self, parameters, loss, model, info=""): + """Updates the results from last optimisation run. + + Args: + parameters: Hyperparameters used in optimisation. + loss: Validation loss obtained. + model: Model to serialised if required. + info: Any ancillary information to tag on to results. + + Returns: + Boolean flag indicating if the model is the best seen so far. + """ + + if np.isnan(loss): + loss = np.Inf + + if not os.path.isdir(self.hyperparam_folder): + os.makedirs(self.hyperparam_folder) + + name = self._get_name(parameters) + + is_optimal = self.results.empty or loss < self.best_score + + # save the first model + if is_optimal: + # Try saving first, before updating info + if model is not None: + print("Optimal model found, updating") + model.save(self.hyperparam_folder) + self.best_score = loss + self.optimal_name = name + + self.results[name] = pd.Series({"loss": loss, "info": info}) + self.saved_params[name] = pd.Series(parameters) + + self.results.to_csv(os.path.join(self.hyperparam_folder, "results.csv")) + self.saved_params.to_csv(os.path.join(self.hyperparam_folder, "params.csv")) + + return is_optimal + + +class DistributedHyperparamOptManager(HyperparamOptManager): + """Manages distributed hyperparameter optimisation across many gpus.""" + + def __init__(self, + param_ranges, + fixed_params, + root_model_folder, + worker_number, + search_iterations=1000, + num_iterations_per_worker=5, + clear_serialised_params=False): + """Instantiates optimisation manager. + + This hyperparameter optimisation pre-generates #search_iterations + hyperparameter combinations and serialises them + at the start. At runtime, each worker goes through their own set of + parameter ranges. The pregeneration + allows for multiple workers to run in parallel on different machines without + resulting in parameter overlaps. + + Args: + param_ranges: Discrete hyperparameter range for random search. + fixed_params: Fixed model parameters per experiment. + root_model_folder: Folder to store optimisation artifacts. + worker_number: Worker index definining which set of hyperparameters to + test. + search_iterations: Maximum numer of random search iterations. + num_iterations_per_worker: How many iterations are handled per worker. + clear_serialised_params: Whether to regenerate hyperparameter + combinations. + """ + + max_workers = int(np.ceil(search_iterations / num_iterations_per_worker)) + + # Sanity checks + if worker_number > max_workers: + raise ValueError( + "Worker number ({}) cannot be larger than the total number of workers!" + .format(max_workers)) + if worker_number > search_iterations: + raise ValueError( + "Worker number ({}) cannot be larger than the max search iterations ({})!" + .format(worker_number, search_iterations)) + + print("*** Creating hyperparameter manager for worker {} ***".format( + worker_number)) + + hyperparam_folder = os.path.join(root_model_folder, str(worker_number)) + super().__init__( + param_ranges, + fixed_params, + hyperparam_folder, + override_w_fixed_params=True) + + serialised_ranges_folder = os.path.join(root_model_folder, "hyperparams") + if clear_serialised_params: + print("Regenerating hyperparameter list") + if os.path.exists(serialised_ranges_folder): + shutil.rmtree(serialised_ranges_folder) + + utils.create_folder_if_not_exist(serialised_ranges_folder) + + self.serialised_ranges_path = os.path.join( + serialised_ranges_folder, "ranges_{}.csv".format(search_iterations)) + self.hyperparam_folder = hyperparam_folder # override + self.worker_num = worker_number + self.total_search_iterations = search_iterations + self.num_iterations_per_worker = num_iterations_per_worker + self.global_hyperparam_df = self.load_serialised_hyperparam_df() + self.worker_search_queue = self._get_worker_search_queue() + + @property + def optimisation_completed(self): + return False if self.worker_search_queue else True + + def get_next_parameters(self): + """Returns next dictionary of hyperparameters to optimise.""" + param_name = self.worker_search_queue.pop() + + params = self.global_hyperparam_df.loc[param_name, :].to_dict() + + # Always override! + for k in self.fixed_params: + print("Overriding saved {}: {}".format(k, self.fixed_params[k])) + + params[k] = self.fixed_params[k] + + return params + + def load_serialised_hyperparam_df(self): + """Loads serialsed hyperparameter ranges from file. + + Returns: + DataFrame containing hyperparameter combinations. + """ + print("Loading params for {} search iterations form {}".format( + self.total_search_iterations, self.serialised_ranges_path)) + + if os.path.exists(self.serialised_ranges_folder): + df = pd.read_csv(self.serialised_ranges_path, index_col=0) + else: + print("Unable to load - regenerating serach ranges instead") + df = self.update_serialised_hyperparam_df() + + return df + + def update_serialised_hyperparam_df(self): + """Regenerates hyperparameter combinations and saves to file. + + Returns: + DataFrame containing hyperparameter combinations. + """ + search_df = self._generate_full_hyperparam_df() + + print("Serialising params for {} search iterations to {}".format( + self.total_search_iterations, self.serialised_ranges_path)) + + search_df.to_csv(self.serialised_ranges_path) + + return search_df + + def _generate_full_hyperparam_df(self): + """Generates actual hyperparameter combinations. + + Returns: + DataFrame containing hyperparameter combinations. + """ + + np.random.seed(131) # for reproducibility of hyperparam list + + name_list = [] + param_list = [] + for _ in range(self.total_search_iterations): + params = super().get_next_parameters(name_list) + + name = self._get_name(params) + + name_list.append(name) + param_list.append(params) + + full_search_df = pd.DataFrame(param_list, index=name_list) + + return full_search_df + + def clear(self): # reset when cleared + """Clears results for hyperparameter manager and resets.""" + super().clear() + self.worker_search_queue = self._get_worker_search_queue() + + def load_results(self): + """Load results from file and queue parameter combinations to try. + + Returns: + Boolean indicating if results were successfully loaded. + """ + success = super().load_results() + + if success: + self.worker_search_queue = self._get_worker_search_queue() + + return success + + def _get_worker_search_queue(self): + """Generates the queue of param combinations for current worker. + + Returns: + Queue of hyperparameter combinations outstanding. + """ + global_df = self.assign_worker_numbers(self.global_hyperparam_df) + worker_df = global_df[global_df["worker"] == self.worker_num] + + left_overs = [s for s in worker_df.index if s not in self.results.columns] + + return Deque(left_overs) + + def assign_worker_numbers(self, df): + """Updates parameter combinations with the index of the worker used. + + Args: + df: DataFrame of parameter combinations. + + Returns: + Updated DataFrame with worker number. + """ + output = df.copy() + + n = self.total_search_iterations + batch_size = self.num_iterations_per_worker + + max_worker_num = int(np.ceil(n / batch_size)) + + worker_idx = np.concatenate([ + np.tile(i + 1, self.num_iterations_per_worker) + for i in range(max_worker_num) + ]) + + output["worker"] = worker_idx[:len(output)] + + return output diff --git a/examples/benchmarks/TFT/libs/tft_model.py b/examples/benchmarks/TFT/libs/tft_model.py new file mode 100644 index 000000000..2a41f4566 --- /dev/null +++ b/examples/benchmarks/TFT/libs/tft_model.py @@ -0,0 +1,1391 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Temporal Fusion Transformer Model. + +Contains the full TFT architecture and associated components. Defines functions +for training, evaluation and prediction using simple Pandas Dataframe inputs. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import gc +import json +import os +import shutil + +import data_formatters.base +import libs.utils as utils +import numpy as np +import pandas as pd +import tensorflow as tf + +# Layer definitions. +concat = tf.keras.backend.concatenate +stack = tf.keras.backend.stack +K = tf.keras.backend +Add = tf.keras.layers.Add +LayerNorm = tf.keras.layers.LayerNormalization +Dense = tf.keras.layers.Dense +Multiply = tf.keras.layers.Multiply +Dropout = tf.keras.layers.Dropout +Activation = tf.keras.layers.Activation +Lambda = tf.keras.layers.Lambda + +# Default input types. +InputTypes = data_formatters.base.InputTypes + + +# Layer utility functions. +def linear_layer(size, + activation=None, + use_time_distributed=False, + use_bias=True): + """Returns simple Keras linear layer. + + Args: + size: Output size + activation: Activation function to apply if required + use_time_distributed: Whether to apply layer across time + use_bias: Whether bias should be included in layer + """ + linear = tf.keras.layers.Dense(size, activation=activation, use_bias=use_bias) + if use_time_distributed: + linear = tf.keras.layers.TimeDistributed(linear) + return linear + + +def apply_mlp(inputs, + hidden_size, + output_size, + output_activation=None, + hidden_activation='tanh', + use_time_distributed=False): + """Applies simple feed-forward network to an input. + + Args: + inputs: MLP inputs + hidden_size: Hidden state size + output_size: Output size of MLP + output_activation: Activation function to apply on output + hidden_activation: Activation function to apply on input + use_time_distributed: Whether to apply across time + + Returns: + Tensor for MLP outputs. + """ + if use_time_distributed: + hidden = tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(hidden_size, activation=hidden_activation))( + inputs) + return tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(output_size, activation=output_activation))( + hidden) + else: + hidden = tf.keras.layers.Dense( + hidden_size, activation=hidden_activation)( + inputs) + return tf.keras.layers.Dense( + output_size, activation=output_activation)( + hidden) + + +def apply_gating_layer(x, + hidden_layer_size, + dropout_rate=None, + use_time_distributed=True, + activation=None): + """Applies a Gated Linear Unit (GLU) to an input. + + Args: + x: Input to gating layer + hidden_layer_size: Dimension of GLU + dropout_rate: Dropout rate to apply if any + use_time_distributed: Whether to apply across time + activation: Activation function to apply to the linear feature transform if + necessary + + Returns: + Tuple of tensors for: (GLU output, gate) + """ + + if dropout_rate is not None: + x = tf.keras.layers.Dropout(dropout_rate)(x) + + if use_time_distributed: + activation_layer = tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(hidden_layer_size, activation=activation))( + x) + gated_layer = tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(hidden_layer_size, activation='sigmoid'))( + x) + else: + activation_layer = tf.keras.layers.Dense( + hidden_layer_size, activation=activation)( + x) + gated_layer = tf.keras.layers.Dense( + hidden_layer_size, activation='sigmoid')( + x) + + return tf.keras.layers.Multiply()([activation_layer, + gated_layer]), gated_layer + + +def add_and_norm(x_list): + """Applies skip connection followed by layer normalisation. + + Args: + x_list: List of inputs to sum for skip connection + + Returns: + Tensor output from layer. + """ + tmp = Add()(x_list) + tmp = LayerNorm()(tmp) + return tmp + + +def gated_residual_network(x, + hidden_layer_size, + output_size=None, + dropout_rate=None, + use_time_distributed=True, + additional_context=None, + return_gate=False): + """Applies the gated residual network (GRN) as defined in paper. + + Args: + x: Network inputs + hidden_layer_size: Internal state size + output_size: Size of output layer + dropout_rate: Dropout rate if dropout is applied + use_time_distributed: Whether to apply network across time dimension + additional_context: Additional context vector to use if relevant + return_gate: Whether to return GLU gate for diagnostic purposes + + Returns: + Tuple of tensors for: (GRN output, GLU gate) + """ + + # Setup skip connection + if output_size is None: + output_size = hidden_layer_size + skip = x + else: + linear = Dense(output_size) + if use_time_distributed: + linear = tf.keras.layers.TimeDistributed(linear) + skip = linear(x) + + # Apply feedforward network + hidden = linear_layer( + hidden_layer_size, + activation=None, + use_time_distributed=use_time_distributed)( + x) + if additional_context is not None: + hidden = hidden + linear_layer( + hidden_layer_size, + activation=None, + use_time_distributed=use_time_distributed, + use_bias=False)( + additional_context) + hidden = tf.keras.layers.Activation('elu')(hidden) + hidden = linear_layer( + hidden_layer_size, + activation=None, + use_time_distributed=use_time_distributed)( + hidden) + + gating_layer, gate = apply_gating_layer( + hidden, + output_size, + dropout_rate=dropout_rate, + use_time_distributed=use_time_distributed, + activation=None) + + if return_gate: + return add_and_norm([skip, gating_layer]), gate + else: + return add_and_norm([skip, gating_layer]) + + +# Attention Components. +def get_decoder_mask(self_attn_inputs): + """Returns causal mask to apply for self-attention layer. + + Args: + self_attn_inputs: Inputs to self attention layer to determine mask shape + """ + len_s = tf.shape(self_attn_inputs)[1] + bs = tf.shape(self_attn_inputs)[:1] + mask = K.cumsum(tf.eye(len_s, batch_shape=bs), 1) + return mask + + +class ScaledDotProductAttention(): + """Defines scaled dot product attention layer. + + Attributes: + dropout: Dropout rate to use + activation: Normalisation function for scaled dot product attention (e.g. + softmax by default) + """ + + def __init__(self, attn_dropout=0.0): + self.dropout = Dropout(attn_dropout) + self.activation = Activation('softmax') + + def __call__(self, q, k, v, mask): + """Applies scaled dot product attention. + + Args: + q: Queries + k: Keys + v: Values + mask: Masking if required -- sets softmax to very large value + + Returns: + Tuple of (layer outputs, attention weights) + """ + temper = tf.sqrt(tf.cast(tf.shape(k)[-1], dtype='float32')) + attn = Lambda(lambda x: K.batch_dot(x[0], x[1], axes=[2, 2]) / temper)( + [q, k]) # shape=(batch, q, k) + if mask is not None: + mmask = Lambda(lambda x: (-1e+9) * (1. - K.cast(x, 'float32')))( + mask) # setting to infinity + attn = Add()([attn, mmask]) + attn = self.activation(attn) + attn = self.dropout(attn) + output = Lambda(lambda x: K.batch_dot(x[0], x[1]))([attn, v]) + return output, attn + + +class InterpretableMultiHeadAttention(): + """Defines interpretable multi-head attention layer. + + Attributes: + n_head: Number of heads + d_k: Key/query dimensionality per head + d_v: Value dimensionality + dropout: Dropout rate to apply + qs_layers: List of queries across heads + ks_layers: List of keys across heads + vs_layers: List of values across heads + attention: Scaled dot product attention layer + w_o: Output weight matrix to project internal state to the original TFT + state size + """ + + def __init__(self, n_head, d_model, dropout): + """Initialises layer. + + Args: + n_head: Number of heads + d_model: TFT state dimensionality + dropout: Dropout discard rate + """ + + self.n_head = n_head + self.d_k = self.d_v = d_k = d_v = d_model // n_head + self.dropout = dropout + + self.qs_layers = [] + self.ks_layers = [] + self.vs_layers = [] + + # Use same value layer to facilitate interp + vs_layer = Dense(d_v, use_bias=False) + + for _ in range(n_head): + self.qs_layers.append(Dense(d_k, use_bias=False)) + self.ks_layers.append(Dense(d_k, use_bias=False)) + self.vs_layers.append(vs_layer) # use same vs_layer + + self.attention = ScaledDotProductAttention() + self.w_o = Dense(d_model, use_bias=False) + + def __call__(self, q, k, v, mask=None): + """Applies interpretable multihead attention. + + Using T to denote the number of time steps fed into the transformer. + + Args: + q: Query tensor of shape=(?, T, d_model) + k: Key of shape=(?, T, d_model) + v: Values of shape=(?, T, d_model) + mask: Masking if required with shape=(?, T, T) + + Returns: + Tuple of (layer outputs, attention weights) + """ + n_head = self.n_head + + heads = [] + attns = [] + for i in range(n_head): + qs = self.qs_layers[i](q) + ks = self.ks_layers[i](k) + vs = self.vs_layers[i](v) + head, attn = self.attention(qs, ks, vs, mask) + + head_dropout = Dropout(self.dropout)(head) + heads.append(head_dropout) + attns.append(attn) + head = K.stack(heads) if n_head > 1 else heads[0] + attn = K.stack(attns) + + outputs = K.mean(head, axis=0) if n_head > 1 else head + outputs = self.w_o(outputs) + outputs = Dropout(self.dropout)(outputs) # output dropout + + return outputs, attn + + +class TFTDataCache(object): + """Caches data for the TFT.""" + + _data_cache = {} + + @classmethod + def update(cls, data, key): + """Updates cached data. + + Args: + data: Source to update + key: Key to dictionary location + """ + cls._data_cache[key] = data + + @classmethod + def get(cls, key): + """Returns data stored at key location.""" + return cls._data_cache[key].copy() + + @classmethod + def contains(cls, key): + """Retuns boolean indicating whether key is present in cache.""" + + return key in cls._data_cache + + +# TFT model definitions. +class TemporalFusionTransformer(object): + """Defines Temporal Fusion Transformer. + + Attributes: + name: Name of model + time_steps: Total number of input time steps per forecast date (i.e. Width + of Temporal fusion decoder N) + input_size: Total number of inputs + output_size: Total number of outputs + category_counts: Number of categories per categorical variable + n_multiprocessing_workers: Number of workers to use for parallel + computations + column_definition: List of tuples of (string, DataType, InputType) that + define each column + quantiles: Quantiles to forecast for TFT + use_cudnn: Whether to use Keras CuDNNLSTM or standard LSTM layers + hidden_layer_size: Internal state size of TFT + dropout_rate: Dropout discard rate + max_gradient_norm: Maximum norm for gradient clipping + learning_rate: Initial learning rate of ADAM optimizer + minibatch_size: Size of minibatches for training + num_epochs: Maximum number of epochs for training + early_stopping_patience: Maximum number of iterations of non-improvement + before early stopping kicks in + num_encoder_steps: Size of LSTM encoder -- i.e. number of past time steps + before forecast date to use + num_stacks: Number of self-attention layers to apply (default is 1 for basic + TFT) + num_heads: Number of heads for interpretable mulit-head attention + model: Keras model for TFT + """ + + def __init__(self, raw_params, use_cudnn=False): + """Builds TFT from parameters. + + Args: + raw_params: Parameters to define TFT + use_cudnn: Whether to use CUDNN GPU optimised LSTM + """ + + self.name = self.__class__.__name__ + + params = dict(raw_params) # copy locally + + # Data parameters + self.time_steps = int(params['total_time_steps']) + self.input_size = int(params['input_size']) + self.output_size = int(params['output_size']) + self.category_counts = json.loads(str(params['category_counts'])) + self.n_multiprocessing_workers = int(params['multiprocessing_workers']) + + # Relevant indices for TFT + self._input_obs_loc = json.loads(str(params['input_obs_loc'])) + self._static_input_loc = json.loads(str(params['static_input_loc'])) + self._known_regular_input_idx = json.loads( + str(params['known_regular_inputs'])) + self._known_categorical_input_idx = json.loads( + str(params['known_categorical_inputs'])) + + self.column_definition = params['column_definition'] + + # Network params + self.quantiles = [0.1, 0.5, 0.9] + self.use_cudnn = use_cudnn # Whether to use GPU optimised LSTM + self.hidden_layer_size = int(params['hidden_layer_size']) + self.dropout_rate = float(params['dropout_rate']) + self.max_gradient_norm = float(params['max_gradient_norm']) + self.learning_rate = float(params['learning_rate']) + self.minibatch_size = int(params['minibatch_size']) + self.num_epochs = int(params['num_epochs']) + self.early_stopping_patience = int(params['early_stopping_patience']) + + self.num_encoder_steps = int(params['num_encoder_steps']) + self.num_stacks = int(params['stack_size']) + self.num_heads = int(params['num_heads']) + + # Serialisation options + self._temp_folder = os.path.join(params['model_folder'], 'tmp') + self.reset_temp_folder() + + # Extra components to store Tensorflow nodes for attention computations + self._input_placeholder = None + self._attention_components = None + self._prediction_parts = None + + print('*** {} params ***'.format(self.name)) + for k in params: + print('# {} = {}'.format(k, params[k])) + + # Build model + self.model = self.build_model() + + def get_tft_embeddings(self, all_inputs): + """Transforms raw inputs to embeddings. + + Applies linear transformation onto continuous variables and uses embeddings + for categorical variables. + + Args: + all_inputs: Inputs to transform + + Returns: + Tensors for transformed inputs. + """ + + time_steps = self.time_steps + + # Sanity checks + for i in self._known_regular_input_idx: + if i in self._input_obs_loc: + raise ValueError('Observation cannot be known a priori!') + for i in self._input_obs_loc: + if i in self._static_input_loc: + raise ValueError('Observation cannot be static!') + + if all_inputs.get_shape().as_list()[-1] != self.input_size: + raise ValueError( + 'Illegal number of inputs! Inputs observed={}, expected={}'.format( + all_inputs.get_shape().as_list()[-1], self.input_size)) + + num_categorical_variables = len(self.category_counts) + num_regular_variables = self.input_size - num_categorical_variables + + embedding_sizes = [ + self.hidden_layer_size for i, size in enumerate(self.category_counts) + ] + + embeddings = [] + for i in range(num_categorical_variables): + + embedding = tf.keras.Sequential([ + tf.keras.layers.InputLayer([time_steps]), + tf.keras.layers.Embedding( + self.category_counts[i], + embedding_sizes[i], + input_length=time_steps, + dtype=tf.float32) + ]) + embeddings.append(embedding) + + regular_inputs, categorical_inputs \ + = all_inputs[:, :, :num_regular_variables], \ + all_inputs[:, :, num_regular_variables:] + + embedded_inputs = [ + embeddings[i](categorical_inputs[Ellipsis, i]) + for i in range(num_categorical_variables) + ] + + # Static inputs + if self._static_input_loc: + static_inputs = [tf.keras.layers.Dense(self.hidden_layer_size)( + regular_inputs[:, 0, i:i + 1]) for i in range(num_regular_variables) + if i in self._static_input_loc] \ + + [embedded_inputs[i][:, 0, :] + for i in range(num_categorical_variables) + if i + num_regular_variables in self._static_input_loc] + static_inputs = tf.keras.backend.stack(static_inputs, axis=1) + + else: + static_inputs = None + + def convert_real_to_embedding(x): + """Applies linear transformation for time-varying inputs.""" + return tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(self.hidden_layer_size))( + x) + + # Targets + obs_inputs = tf.keras.backend.stack([ + convert_real_to_embedding(regular_inputs[Ellipsis, i:i + 1]) + for i in self._input_obs_loc + ], + axis=-1) + + # Observed (a prioir unknown) inputs + wired_embeddings = [] + for i in range(num_categorical_variables): + if i not in self._known_categorical_input_idx \ + and i + num_regular_variables not in self._input_obs_loc: + e = embeddings[i](categorical_inputs[:, :, i]) + wired_embeddings.append(e) + + unknown_inputs = [] + for i in range(regular_inputs.shape[-1]): + if i not in self._known_regular_input_idx \ + and i not in self._input_obs_loc: + e = convert_real_to_embedding(regular_inputs[Ellipsis, i:i + 1]) + unknown_inputs.append(e) + + if unknown_inputs + wired_embeddings: + unknown_inputs = tf.keras.backend.stack( + unknown_inputs + wired_embeddings, axis=-1) + else: + unknown_inputs = None + + # A priori known inputs + known_regular_inputs = [ + convert_real_to_embedding(regular_inputs[Ellipsis, i:i + 1]) + for i in self._known_regular_input_idx + if i not in self._static_input_loc + ] + known_categorical_inputs = [ + embedded_inputs[i] + for i in self._known_categorical_input_idx + if i + num_regular_variables not in self._static_input_loc + ] + + known_combined_layer = tf.keras.backend.stack( + known_regular_inputs + known_categorical_inputs, axis=-1) + + return unknown_inputs, known_combined_layer, obs_inputs, static_inputs + + def _get_single_col_by_type(self, input_type): + """Returns name of single column for input type.""" + + return utils.get_single_col_by_input_type(input_type, + self.column_definition) + + def training_data_cached(self): + """Returns boolean indicating if training data has been cached.""" + + return TFTDataCache.contains('train') and TFTDataCache.contains('valid') + + def cache_batched_data(self, data, cache_key, num_samples=-1): + """Batches and caches data once for using during training. + + Args: + data: Data to batch and cache + cache_key: Key used for cache + num_samples: Maximum number of samples to extract (-1 to use all data) + """ + + if num_samples > 0: + TFTDataCache.update( + self._batch_sampled_data(data, max_samples=num_samples), cache_key) + else: + TFTDataCache.update(self._batch_data(data), cache_key) + + print('Cached data "{}" updated'.format(cache_key)) + + def _batch_sampled_data(self, data, max_samples): + """Samples segments into a compatible format. + + Args: + data: Sources data to sample and batch + max_samples: Maximum number of samples in batch + + Returns: + Dictionary of batched data with the maximum samples specified. + """ + + if max_samples < 1: + raise ValueError( + 'Illegal number of samples specified! samples={}'.format(max_samples)) + + id_col = self._get_single_col_by_type(InputTypes.ID) + time_col = self._get_single_col_by_type(InputTypes.TIME) + + data.sort_values(by=[id_col, time_col], inplace=True) + + print('Getting valid sampling locations.') + valid_sampling_locations = [] + split_data_map = {} + for identifier, df in data.groupby(id_col): + print('Getting locations for {}'.format(identifier)) + num_entries = len(df) + if num_entries >= self.time_steps: + valid_sampling_locations += [ + (identifier, self.time_steps + i) + for i in range(num_entries - self.time_steps + 1) + ] + split_data_map[identifier] = df + + inputs = np.zeros((max_samples, self.time_steps, self.input_size)) + outputs = np.zeros((max_samples, self.time_steps, self.output_size)) + time = np.empty((max_samples, self.time_steps, 1), dtype=object) + identifiers = np.empty((max_samples, self.time_steps, 1), dtype=object) + + if max_samples > 0 and len(valid_sampling_locations) > max_samples: + print('Extracting {} samples...'.format(max_samples)) + ranges = [ + valid_sampling_locations[i] for i in np.random.choice( + len(valid_sampling_locations), max_samples, replace=False) + ] + else: + print('Max samples={} exceeds # available segments={}'.format( + max_samples, len(valid_sampling_locations))) + ranges = valid_sampling_locations + + id_col = self._get_single_col_by_type(InputTypes.ID) + time_col = self._get_single_col_by_type(InputTypes.TIME) + target_col = self._get_single_col_by_type(InputTypes.TARGET) + input_cols = [ + tup[0] + for tup in self.column_definition + if tup[2] not in {InputTypes.ID, InputTypes.TIME} + ] + + for i, tup in enumerate(ranges): + if (i + 1 % 1000) == 0: + print(i + 1, 'of', max_samples, 'samples done...') + identifier, start_idx = tup + sliced = split_data_map[identifier].iloc[start_idx - + self.time_steps:start_idx] + inputs[i, :, :] = sliced[input_cols] + outputs[i, :, :] = sliced[[target_col]] + time[i, :, 0] = sliced[time_col] + identifiers[i, :, 0] = sliced[id_col] + + sampled_data = { + 'inputs': inputs, + 'outputs': outputs[:, self.num_encoder_steps:, :], + 'active_entries': np.ones_like(outputs[:, self.num_encoder_steps:, :]), + 'time': time, + 'identifier': identifiers + } + + return sampled_data + + def _batch_data(self, data): + """Batches data for training. + + Converts raw dataframe from a 2-D tabular format to a batched 3-D array + to feed into Keras model. + + Args: + data: DataFrame to batch + + Returns: + Batched Numpy array with shape=(?, self.time_steps, self.input_size) + """ + + # Functions. + def _batch_single_entity(input_data): + time_steps = len(input_data) + lags = self.time_steps + x = input_data.values + if time_steps >= lags: + return np.stack( + [x[i:time_steps - (lags - 1) + i, :] for i in range(lags)], axis=1) + + else: + return None + + id_col = self._get_single_col_by_type(InputTypes.ID) + time_col = self._get_single_col_by_type(InputTypes.TIME) + target_col = self._get_single_col_by_type(InputTypes.TARGET) + input_cols = [ + tup[0] + for tup in self.column_definition + if tup[2] not in {InputTypes.ID, InputTypes.TIME} + ] + + data_map = {} + for _, sliced in data.groupby(id_col): + + col_mappings = { + 'identifier': [id_col], + 'time': [time_col], + 'outputs': [target_col], + 'inputs': input_cols + } + + for k in col_mappings: + cols = col_mappings[k] + arr = _batch_single_entity(sliced[cols].copy()) + + if k not in data_map: + data_map[k] = [arr] + else: + data_map[k].append(arr) + + # Combine all data + for k in data_map: + # Wendi: Avoid returning None when the length is not enough + data_map[k] = np.concatenate([i for i in data_map[k] if i is not None], axis=0) + + # Shorten target so we only get decoder steps + data_map['outputs'] = data_map['outputs'][:, self.num_encoder_steps:, :] + + active_entries = np.ones_like(data_map['outputs']) + if 'active_entries' not in data_map: + data_map['active_entries'] = active_entries + else: + data_map['active_entries'].append(active_entries) + + return data_map + + def _get_active_locations(self, x): + """Formats sample weights for Keras training.""" + return (np.sum(x, axis=-1) > 0.0) * 1.0 + + def _build_base_graph(self): + """Returns graph defining layers of the TFT.""" + + # Size definitions. + time_steps = self.time_steps + combined_input_size = self.input_size + encoder_steps = self.num_encoder_steps + + # Inputs. + all_inputs = tf.keras.layers.Input( + shape=( + time_steps, + combined_input_size, + )) + + unknown_inputs, known_combined_layer, obs_inputs, static_inputs \ + = self.get_tft_embeddings(all_inputs) + + # Isolate known and observed historical inputs. + if unknown_inputs is not None: + historical_inputs = concat([ + unknown_inputs[:, :encoder_steps, :], + known_combined_layer[:, :encoder_steps, :], + obs_inputs[:, :encoder_steps, :] + ], + axis=-1) + else: + historical_inputs = concat([ + known_combined_layer[:, :encoder_steps, :], + obs_inputs[:, :encoder_steps, :] + ], + axis=-1) + + # Isolate only known future inputs. + future_inputs = known_combined_layer[:, encoder_steps:, :] + + def static_combine_and_mask(embedding): + """Applies variable selection network to static inputs. + + Args: + embedding: Transformed static inputs + + Returns: + Tensor output for variable selection network + """ + + # Add temporal features + _, num_static, _ = embedding.get_shape().as_list() + + flatten = tf.keras.layers.Flatten()(embedding) + + # Nonlinear transformation with gated residual network. + mlp_outputs = gated_residual_network( + flatten, + self.hidden_layer_size, + output_size=num_static, + dropout_rate=self.dropout_rate, + use_time_distributed=False, + additional_context=None) + + sparse_weights = tf.keras.layers.Activation('softmax')(mlp_outputs) + sparse_weights = K.expand_dims(sparse_weights, axis=-1) + + trans_emb_list = [] + for i in range(num_static): + e = gated_residual_network( + embedding[:, i:i + 1, :], + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + use_time_distributed=False) + trans_emb_list.append(e) + + transformed_embedding = concat(trans_emb_list, axis=1) + + combined = tf.keras.layers.Multiply()( + [sparse_weights, transformed_embedding]) + + static_vec = K.sum(combined, axis=1) + + return static_vec, sparse_weights + + static_encoder, static_weights = static_combine_and_mask(static_inputs) + + static_context_variable_selection = gated_residual_network( + static_encoder, + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + use_time_distributed=False) + static_context_enrichment = gated_residual_network( + static_encoder, + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + use_time_distributed=False) + static_context_state_h = gated_residual_network( + static_encoder, + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + use_time_distributed=False) + static_context_state_c = gated_residual_network( + static_encoder, + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + use_time_distributed=False) + + def lstm_combine_and_mask(embedding): + """Apply temporal variable selection networks. + + Args: + embedding: Transformed inputs. + + Returns: + Processed tensor outputs. + """ + + # Add temporal features + _, time_steps, embedding_dim, num_inputs = embedding.get_shape().as_list() + + flatten = K.reshape(embedding, + [-1, time_steps, embedding_dim * num_inputs]) + + expanded_static_context = K.expand_dims( + static_context_variable_selection, axis=1) + + # Variable selection weights + mlp_outputs, static_gate = gated_residual_network( + flatten, + self.hidden_layer_size, + output_size=num_inputs, + dropout_rate=self.dropout_rate, + use_time_distributed=True, + additional_context=expanded_static_context, + return_gate=True) + + sparse_weights = tf.keras.layers.Activation('softmax')(mlp_outputs) + sparse_weights = tf.expand_dims(sparse_weights, axis=2) + + # Non-linear Processing & weight application + trans_emb_list = [] + for i in range(num_inputs): + grn_output = gated_residual_network( + embedding[Ellipsis, i], + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + use_time_distributed=True) + trans_emb_list.append(grn_output) + + transformed_embedding = stack(trans_emb_list, axis=-1) + + combined = tf.keras.layers.Multiply()( + [sparse_weights, transformed_embedding]) + temporal_ctx = K.sum(combined, axis=-1) + + return temporal_ctx, sparse_weights, static_gate + + historical_features, historical_flags, _ = lstm_combine_and_mask( + historical_inputs) + future_features, future_flags, _ = lstm_combine_and_mask(future_inputs) + + # LSTM layer + def get_lstm(return_state): + """Returns LSTM cell initialized with default parameters.""" + if self.use_cudnn: + lstm = tf.keras.layers.CuDNNLSTM( + self.hidden_layer_size, + return_sequences=True, + return_state=return_state, + stateful=False, + ) + else: + lstm = tf.keras.layers.LSTM( + self.hidden_layer_size, + return_sequences=True, + return_state=return_state, + stateful=False, + # Additional params to ensure LSTM matches CuDNN, See TF 2.0 : + # (https://www.tensorflow.org/api_docs/python/tf/keras/layers/LSTM) + activation='tanh', + recurrent_activation='sigmoid', + recurrent_dropout=0, + unroll=False, + use_bias=True) + return lstm + + history_lstm, state_h, state_c \ + = get_lstm(return_state=True)(historical_features, + initial_state=[static_context_state_h, + static_context_state_c]) + + future_lstm = get_lstm(return_state=False)( + future_features, initial_state=[state_h, state_c]) + + lstm_layer = concat([history_lstm, future_lstm], axis=1) + + # Apply gated skip connection + input_embeddings = concat([historical_features, future_features], axis=1) + + lstm_layer, _ = apply_gating_layer( + lstm_layer, self.hidden_layer_size, self.dropout_rate, activation=None) + temporal_feature_layer = add_and_norm([lstm_layer, input_embeddings]) + + # Static enrichment layers + expanded_static_context = K.expand_dims(static_context_enrichment, axis=1) + enriched, _ = gated_residual_network( + temporal_feature_layer, + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + use_time_distributed=True, + additional_context=expanded_static_context, + return_gate=True) + + # Decoder self attention + self_attn_layer = InterpretableMultiHeadAttention( + self.num_heads, self.hidden_layer_size, dropout=self.dropout_rate) + + mask = get_decoder_mask(enriched) + x, self_att \ + = self_attn_layer(enriched, enriched, enriched, + mask=mask) + + x, _ = apply_gating_layer( + x, + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + activation=None) + x = add_and_norm([x, enriched]) + + # Nonlinear processing on outputs + decoder = gated_residual_network( + x, + self.hidden_layer_size, + dropout_rate=self.dropout_rate, + use_time_distributed=True) + + # Final skip connection + decoder, _ = apply_gating_layer( + decoder, self.hidden_layer_size, activation=None) + transformer_layer = add_and_norm([decoder, temporal_feature_layer]) + + # Attention components for explainability + attention_components = { + # Temporal attention weights + 'decoder_self_attn': self_att, + # Static variable selection weights + 'static_flags': static_weights[Ellipsis, 0], + # Variable selection weights of past inputs + 'historical_flags': historical_flags[Ellipsis, 0, :], + # Variable selection weights of future inputs + 'future_flags': future_flags[Ellipsis, 0, :] + } + + return transformer_layer, all_inputs, attention_components + + def build_model(self): + """Build model and defines training losses. + + Returns: + Fully defined Keras model. + """ + + with tf.variable_scope(self.name): + + transformer_layer, all_inputs, attention_components \ + = self._build_base_graph() + + outputs = tf.keras.layers.TimeDistributed( + tf.keras.layers.Dense(self.output_size * len(self.quantiles))) \ + (transformer_layer[Ellipsis, self.num_encoder_steps:, :]) + + self._attention_components = attention_components + + adam = tf.keras.optimizers.Adam( + lr=self.learning_rate, clipnorm=self.max_gradient_norm) + + model = tf.keras.Model(inputs=all_inputs, outputs=outputs) + + print(model.summary()) + + valid_quantiles = self.quantiles + output_size = self.output_size + + class QuantileLossCalculator(object): + """Computes the combined quantile loss for prespecified quantiles. + + Attributes: + quantiles: Quantiles to compute losses + """ + + def __init__(self, quantiles): + """Initializes computer with quantiles for loss calculations. + + Args: + quantiles: Quantiles to use for computations. + """ + self.quantiles = quantiles + + def quantile_loss(self, a, b): + """Returns quantile loss for specified quantiles. + + Args: + a: Targets + b: Predictions + """ + quantiles_used = set(self.quantiles) + + loss = 0. + for i, quantile in enumerate(valid_quantiles): + if quantile in quantiles_used: + loss += utils.tensorflow_quantile_loss( + a[Ellipsis, output_size * i:output_size * (i + 1)], + b[Ellipsis, output_size * i:output_size * (i + 1)], quantile) + return loss + + quantile_loss = QuantileLossCalculator(valid_quantiles).quantile_loss + + model.compile( + loss=quantile_loss, optimizer=adam, sample_weight_mode='temporal') + + self._input_placeholder = all_inputs + + return model + + def fit(self, train_df=None, valid_df=None): + """Fits deep neural network for given training and validation data. + + Args: + train_df: DataFrame for training data + valid_df: DataFrame for validation data + """ + + print('*** Fitting {} ***'.format(self.name)) + + # Add relevant callbacks + callbacks = [ + tf.keras.callbacks.EarlyStopping( + monitor='val_loss', + patience=self.early_stopping_patience, + min_delta=1e-4), + tf.keras.callbacks.ModelCheckpoint( + filepath=self.get_keras_saved_path(self._temp_folder), + monitor='val_loss', + save_best_only=True, + save_weights_only=True), + tf.keras.callbacks.TerminateOnNaN() + ] + + print('Getting batched_data') + if train_df is None: + print('Using cached training data') + train_data = TFTDataCache.get('train') + else: + train_data = self._batch_data(train_df) + + if valid_df is None: + print('Using cached validation data') + valid_data = TFTDataCache.get('valid') + else: + valid_data = self._batch_data(valid_df) + + print('Using keras standard fit') + + def _unpack(data): + return data['inputs'], data['outputs'], \ + self._get_active_locations(data['active_entries']) + + # Unpack without sample weights + data, labels, active_flags = _unpack(train_data) + val_data, val_labels, val_flags = _unpack(valid_data) + + all_callbacks = callbacks + + self.model.fit( + x=data, + y=np.concatenate([labels, labels, labels], axis=-1), + sample_weight=active_flags, + epochs=self.num_epochs, + batch_size=self.minibatch_size, + validation_data=(val_data, + np.concatenate([val_labels, val_labels, val_labels], + axis=-1), val_flags), + callbacks=all_callbacks, + shuffle=True, + use_multiprocessing=True, + workers=self.n_multiprocessing_workers) + + # Load best checkpoint again + tmp_checkpont = self.get_keras_saved_path(self._temp_folder) + if os.path.exists(tmp_checkpont): + self.load( + self._temp_folder, + use_keras_loadings=True) + + else: + print('Cannot load from {}, skipping ...'.format(self._temp_folder)) + + def evaluate(self, data=None, eval_metric='loss'): + """Applies evaluation metric to the training data. + + Args: + data: Dataframe for evaluation + eval_metric: Evaluation metic to return, based on model definition. + + Returns: + Computed evaluation loss. + """ + + if data is None: + print('Using cached validation data') + raw_data = TFTDataCache.get('valid') + else: + raw_data = self._batch_data(data) + + inputs = raw_data['inputs'] + outputs = raw_data['outputs'] + active_entries = self._get_active_locations(raw_data['active_entries']) + + metric_values = self.model.evaluate( + x=inputs, + y=np.concatenate([outputs, outputs, outputs], axis=-1), + sample_weight=active_entries, + workers=16, + use_multiprocessing=True) + + metrics = pd.Series(metric_values, self.model.metrics_names) + + return metrics[eval_metric] + + def predict(self, df, return_targets=False): + """Computes predictions for a given input dataset. + + Args: + df: Input dataframe + return_targets: Whether to also return outputs aligned with predictions to + faciliate evaluation + + Returns: + Input dataframe or tuple of (input dataframe, algined output dataframe). + """ + + data = self._batch_data(df) + + inputs = data['inputs'] + time = data['time'] + identifier = data['identifier'] + outputs = data['outputs'] + + combined = self.model.predict( + inputs, + workers=16, + use_multiprocessing=True, + batch_size=self.minibatch_size) + + # Format output_csv + if self.output_size != 1: + raise NotImplementedError('Current version only supports 1D targets!') + + def format_outputs(prediction): + """Returns formatted dataframes for prediction.""" + + flat_prediction = pd.DataFrame( + prediction[:, :, 0], + columns=[ + 't+{}'.format(i) + for i in range(self.time_steps - self.num_encoder_steps) + ]) + cols = list(flat_prediction.columns) + flat_prediction['forecast_time'] = time[:, self.num_encoder_steps - 1, 0] + flat_prediction['identifier'] = identifier[:, 0, 0] + + # Arrange in order + return flat_prediction[['forecast_time', 'identifier'] + cols] + + # Extract predictions for each quantile into different entries + process_map = { + 'p{}'.format(int(q * 100)): + combined[Ellipsis, i * self.output_size:(i + 1) * self.output_size] + for i, q in enumerate(self.quantiles) + } + + if return_targets: + # Add targets if relevant + process_map['targets'] = outputs + + return {k: format_outputs(process_map[k]) for k in process_map} + + def get_attention(self, df): + """Computes TFT attention weights for a given dataset. + + Args: + df: Input dataframe + + Returns: + Dictionary of numpy arrays for temporal attention weights and variable + selection weights, along with their identifiers and time indices + """ + + data = self._batch_data(df) + inputs = data['inputs'] + identifiers = data['identifier'] + time = data['time'] + + def get_batch_attention_weights(input_batch): + """Returns weights for a given minibatch of data.""" + input_placeholder = self._input_placeholder + attention_weights = {} + for k in self._attention_components: + attention_weight = tf.keras.backend.get_session().run( + self._attention_components[k], + {input_placeholder: input_batch.astype(np.float32)}) + attention_weights[k] = attention_weight + return attention_weights + + # Compute number of batches + batch_size = self.minibatch_size + n = inputs.shape[0] + num_batches = n // batch_size + if n - (num_batches * batch_size) > 0: + num_batches += 1 + + # Split up inputs into batches + batched_inputs = [ + inputs[i * batch_size:(i + 1) * batch_size, Ellipsis] + for i in range(num_batches) + ] + + # Get attention weights, while avoiding large memory increases + attention_by_batch = [ + get_batch_attention_weights(batch) for batch in batched_inputs + ] + attention_weights = {} + for k in self._attention_components: + attention_weights[k] = [] + for batch_weights in attention_by_batch: + attention_weights[k].append(batch_weights[k]) + + if len(attention_weights[k][0].shape) == 4: + tmp = np.concatenate(attention_weights[k], axis=1) + else: + tmp = np.concatenate(attention_weights[k], axis=0) + + del attention_weights[k] + gc.collect() + attention_weights[k] = tmp + + attention_weights['identifiers'] = identifiers[:, 0, 0] + attention_weights['time'] = time[:, :, 0] + + return attention_weights + + # Serialisation. + def reset_temp_folder(self): + """Deletes and recreates folder with temporary Keras training outputs.""" + print('Resetting temp folder...') + utils.create_folder_if_not_exist(self._temp_folder) + shutil.rmtree(self._temp_folder) + os.makedirs(self._temp_folder) + + def get_keras_saved_path(self, model_folder): + """Returns path to keras checkpoint.""" + return os.path.join(model_folder, '{}.check'.format(self.name)) + + def save(self, model_folder): + """Saves optimal TFT weights. + + Args: + model_folder: Location to serialze model. + """ + # Allows for direct serialisation of tensorflow variables to avoid spurious + # issue with Keras that leads to different performance evaluation results + # when model is reloaded (https://github.com/keras-team/keras/issues/4875). + + utils.save( + tf.keras.backend.get_session(), + model_folder, + cp_name=self.name, + scope=self.name) + + def load(self, model_folder, use_keras_loadings=False): + """Loads TFT weights. + + Args: + model_folder: Folder containing serialized models. + use_keras_loadings: Whether to load from Keras checkpoint. + + Returns: + + """ + if use_keras_loadings: + # Loads temporary Keras model saved during training. + serialisation_path = self.get_keras_saved_path(model_folder) + print('Loading model from {}'.format(serialisation_path)) + self.model.load_weights(serialisation_path) + else: + # Loads tensorflow graph for optimal models. + utils.load( + tf.keras.backend.get_session(), + model_folder, + cp_name=self.name, + scope=self.name) + + @classmethod + def get_hyperparm_choices(cls): + """Returns hyperparameter ranges for random search.""" + return { + 'dropout_rate': [0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 0.9], + 'hidden_layer_size': [10, 20, 40, 80, 160, 240, 320], + 'minibatch_size': [64, 128, 256], + 'learning_rate': [1e-4, 1e-3, 1e-2], + 'max_gradient_norm': [0.01, 1.0, 100.0], + 'num_heads': [1, 4], + 'stack_size': [1], + } diff --git a/examples/benchmarks/TFT/libs/utils.py b/examples/benchmarks/TFT/libs/utils.py new file mode 100644 index 000000000..813d4b176 --- /dev/null +++ b/examples/benchmarks/TFT/libs/utils.py @@ -0,0 +1,236 @@ +# coding=utf-8 +# Copyright 2020 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Generic helper functions used across codebase.""" + +import os +import pathlib + +import numpy as np +import tensorflow as tf +from tensorflow.python.tools.inspect_checkpoint import print_tensors_in_checkpoint_file + + +# Generic. +def get_single_col_by_input_type(input_type, column_definition): + """Returns name of single column. + + Args: + input_type: Input type of column to extract + column_definition: Column definition list for experiment + """ + + l = [tup[0] for tup in column_definition if tup[2] == input_type] + + if len(l) != 1: + raise ValueError('Invalid number of columns for {}'.format(input_type)) + + return l[0] + + +def extract_cols_from_data_type(data_type, column_definition, + excluded_input_types): + """Extracts the names of columns that correspond to a define data_type. + + Args: + data_type: DataType of columns to extract. + column_definition: Column definition to use. + excluded_input_types: Set of input types to exclude + + Returns: + List of names for columns with data type specified. + """ + return [ + tup[0] + for tup in column_definition + if tup[1] == data_type and tup[2] not in excluded_input_types + ] + + +# Loss functions. +def tensorflow_quantile_loss(y, y_pred, quantile): + """Computes quantile loss for tensorflow. + + Standard quantile loss as defined in the "Training Procedure" section of + the main TFT paper + + Args: + y: Targets + y_pred: Predictions + quantile: Quantile to use for loss calculations (between 0 & 1) + + Returns: + Tensor for quantile loss. + """ + + # Checks quantile + if quantile < 0 or quantile > 1: + raise ValueError( + 'Illegal quantile value={}! Values should be between 0 and 1.'.format( + quantile)) + + prediction_underflow = y - y_pred + q_loss = quantile * tf.maximum(prediction_underflow, 0.) + ( + 1. - quantile) * tf.maximum(-prediction_underflow, 0.) + + return tf.reduce_sum(q_loss, axis=-1) + + +def numpy_normalised_quantile_loss(y, y_pred, quantile): + """Computes normalised quantile loss for numpy arrays. + + Uses the q-Risk metric as defined in the "Training Procedure" section of the + main TFT paper. + + Args: + y: Targets + y_pred: Predictions + quantile: Quantile to use for loss calculations (between 0 & 1) + + Returns: + Float for normalised quantile loss. + """ + prediction_underflow = y - y_pred + weighted_errors = quantile * np.maximum(prediction_underflow, 0.) \ + + (1. - quantile) * np.maximum(-prediction_underflow, 0.) + + quantile_loss = weighted_errors.mean() + normaliser = y.abs().mean() + + return 2 * quantile_loss / normaliser + + +# OS related functions. +def create_folder_if_not_exist(directory): + """Creates folder if it doesn't exist. + + Args: + directory: Folder path to create. + """ + # Also creates directories recursively + pathlib.Path(directory).mkdir(parents=True, exist_ok=True) + + +# Tensorflow related functions. +def get_default_tensorflow_config(tf_device='gpu', gpu_id=0): + """Creates tensorflow config for graphs to run on CPU or GPU. + + Specifies whether to run graph on gpu or cpu and which GPU ID to use for multi + GPU machines. + + Args: + tf_device: 'cpu' or 'gpu' + gpu_id: GPU ID to use if relevant + + Returns: + Tensorflow config. + """ + + if tf_device == 'cpu': + os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # for training on cpu + tf_config = tf.ConfigProto( + log_device_placement=False, device_count={'GPU': 0}) + + else: + os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' + os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id) + + print('Selecting GPU ID={}'.format(gpu_id)) + + tf_config = tf.ConfigProto(log_device_placement=False) + tf_config.gpu_options.allow_growth = True + + return tf_config + + +def save(tf_session, model_folder, cp_name, scope=None): + """Saves Tensorflow graph to checkpoint. + + Saves all trainiable variables under a given variable scope to checkpoint. + + Args: + tf_session: Session containing graph + model_folder: Folder to save models + cp_name: Name of Tensorflow checkpoint + scope: Variable scope containing variables to save + """ + # Save model + if scope is None: + saver = tf.train.Saver() + else: + var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=scope) + saver = tf.train.Saver(var_list=var_list, max_to_keep=100000) + + save_path = saver.save(tf_session, + os.path.join(model_folder, '{0}.ckpt'.format(cp_name))) + print('Model saved to: {0}'.format(save_path)) + + +def load(tf_session, model_folder, cp_name, scope=None, verbose=False): + """Loads Tensorflow graph from checkpoint. + + Args: + tf_session: Session to load graph into + model_folder: Folder containing serialised model + cp_name: Name of Tensorflow checkpoint + scope: Variable scope to use. + verbose: Whether to print additional debugging information. + """ + # Load model proper + load_path = os.path.join(model_folder, '{0}.ckpt'.format(cp_name)) + + print('Loading model from {0}'.format(load_path)) + + print_weights_in_checkpoint(model_folder, cp_name) + + initial_vars = set( + [v.name for v in tf.get_default_graph().as_graph_def().node]) + + # Saver + if scope is None: + saver = tf.train.Saver() + else: + var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=scope) + saver = tf.train.Saver(var_list=var_list, max_to_keep=100000) + # Load + saver.restore(tf_session, load_path) + all_vars = set([v.name for v in tf.get_default_graph().as_graph_def().node]) + + if verbose: + print('Restored {0}'.format(','.join(initial_vars.difference(all_vars)))) + print('Existing {0}'.format(','.join(all_vars.difference(initial_vars)))) + print('All {0}'.format(','.join(all_vars))) + + print('Done.') + + +def print_weights_in_checkpoint(model_folder, cp_name): + """Prints all weights in Tensorflow checkpoint. + + Args: + model_folder: Folder containing checkpoint + cp_name: Name of checkpoint + + Returns: + + """ + load_path = os.path.join(model_folder, '{0}.ckpt'.format(cp_name)) + + print_tensors_in_checkpoint_file( + file_name=load_path, + tensor_name='', + all_tensors=True, + all_tensor_names=True) diff --git a/examples/benchmarks/TFT/tft.py b/examples/benchmarks/TFT/tft.py new file mode 100644 index 000000000..ee49a1eb7 --- /dev/null +++ b/examples/benchmarks/TFT/tft.py @@ -0,0 +1,246 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import numpy as np +import pandas as pd +import tensorflow.compat.v1 as tf +import data_formatters.base +import expt_settings.configs +import libs.hyperparam_opt +import libs.tft_model +import libs.utils as utils +import os +import datetime as dte + + +from qlib.model.base import ModelFT +from qlib.data.dataset import DatasetH +from qlib.data.dataset.handler import DataHandlerLP + + + +# To register new datasets, please add them here. +ALLOW_DATASET = ['Alpha158'] +DATASET_SETTING = { + 'Alpha158': { + 'feature_col': ['RESI5', 'WVMA5', 'RSQR5', 'KLEN', 'RSQR10', 'CORR5', 'CORD5', 'CORR10', 'ROC60', 'RESI10'], + 'label_col': ['LABEL0'], + }, +} +# To register new datasets, please add their configurations here. + +def get_shifted_label(data_df, shifts=5, col_shift='LABEL0'): + return data_df[[col_shift]].groupby('instrument').apply(lambda df: df.shift(shifts)) + +def fill_test_na(test_df): + test_df_res = test_df.copy() + feature_cols = ~test_df_res.columns.str.contains('label', case=False) + test_feature_fna = test_df_res.loc[:, feature_cols].groupby('datetime').apply(lambda df: df.fillna(df.mean())) + test_df_res.loc[:, feature_cols] = test_feature_fna + return test_df_res + +def process_qlib_data(df, dataset, fillna=False): + """Prepare data to fit the TFT model. + + Args: + df: Original DataFrame. + fillna: Whether to fill the data with the mean values. + + Returns: + Transformed DataFrame. + + """ + # Several features selected manually + feature_col = DATASET_SETTING[dataset]['feature_col'] + label_col = DATASET_SETTING[dataset]['label_col'] + temp_df = df.loc[:, feature_col+label_col] + if fillna: + temp_df = fill_test_na(temp_df) + temp_df = temp_df.swaplevel() + temp_df = temp_df.sort_index() + temp_df = temp_df.reset_index(level=0) + dates = pd.to_datetime(temp_df.index) + temp_df['date'] = dates + temp_df['day_of_week'] = dates.dayofweek + temp_df['month'] = dates.month + temp_df['year'] = dates.year + temp_df['const'] = 1.0 + return temp_df + +def process_predicted(df, col_name): + """Transform the TFT predicted data into Qlib format. + + Args: + df: Original DataFrame. + fillna: New column name. + + Returns: + Transformed DataFrame. + + """ + df_res = df.copy() + df_res = df_res.rename(columns={"forecast_time": "datetime", "identifier": "instrument", "t+0": col_name}) + df_res = df_res.set_index(['datetime','instrument']).sort_index() + df_res = df_res[[col_name]] + return df_res + +def format_score(forecast_df, col_name='pred', label_shift=5): + pred = process_predicted(forecast_df, col_name=col_name) + pred = get_shifted_label(pred, shifts=-label_shift, col_shift=col_name) + pred = pred.dropna()[col_name] + return pred + +def transform_df(df, col_name='LABEL0'): + df_res = df['feature'] + df_res[col_name] = df['label'] + return df_res + +class TFTModel(ModelFT): + """TFT Model""" + + def __init__(self, **kwargs): + self.model = None + + def _prepare_data(self, dataset: DatasetH): + df_train, df_valid = dataset.prepare( + ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L + ) + return transform_df(df_train), transform_df(df_valid) + + def fit( + self, + dataset: DatasetH, + DATASET = 'Alpha158', + MODEL_FOLDER = 'qlib_alpha158_model', + LABEL_COL = 'LABEL0', + LABEL_SHIFT = 5, + USE_GPU_ID = 0, + **kwargs + ): + + if DATASET not in ALLOW_DATASET: + raise AssertionError("The dataset is not supported, please make a new formatter to fit this dataset") + + dtrain, dvalid = self._prepare_data(dataset) + dtrain.loc[:, LABEL_COL] = get_shifted_label(dtrain, shifts=LABEL_SHIFT, col_shift=LABEL_COL) + dvalid.loc[:, LABEL_COL] = get_shifted_label(dvalid, shifts=LABEL_SHIFT, col_shift=LABEL_COL) + + + train = process_qlib_data(dtrain, DATASET, fillna=True).dropna() + valid = process_qlib_data(dvalid, DATASET, fillna=True).dropna() + + ExperimentConfig = expt_settings.configs.ExperimentConfig + config = ExperimentConfig(DATASET) + self.data_formatter = config.make_data_formatter() + self.model_folder = MODEL_FOLDER + self.gpu_id = USE_GPU_ID + self.label_shift = LABEL_SHIFT + self.expt_name = DATASET + self.label_col = LABEL_COL + + use_gpu = (True, self.gpu_id) + #===========================Training Process=========================== + ModelClass = libs.tft_model.TemporalFusionTransformer + if not isinstance(self.data_formatter, data_formatters.base.GenericDataFormatter): + raise ValueError( + "Data formatters should inherit from" + + "AbstractDataFormatter! Type={}".format(type(self.data_formatter))) + + default_keras_session = tf.keras.backend.get_session() + + if use_gpu[0]: + self.tf_config = utils.get_default_tensorflow_config(tf_device="gpu", gpu_id=use_gpu[1]) + else: + self.tf_config = utils.get_default_tensorflow_config(tf_device="cpu") + + self.data_formatter.set_scalers(train) + + # Sets up default params + fixed_params = self.data_formatter.get_experiment_params() + params = self.data_formatter.get_default_model_params() + + # Wendi: 合并调优的参数和非调优的参数 + params = {**params, **fixed_params} + + if not os.path.exists(self.model_folder): + os.makedirs(self.model_folder) + params['model_folder'] = self.model_folder + + print("*** Begin training ***") + best_loss = np.Inf + + tf.reset_default_graph() + + self.tf_graph = tf.Graph() + with self.tf_graph.as_default(): + self.sess = tf.Session(config=self.tf_config) + tf.keras.backend.set_session(self.sess) + self.model = ModelClass(params, use_cudnn=use_gpu[0]) + self.sess.run(tf.global_variables_initializer()) + self.model.fit(train_df=train, valid_df=valid) + print("*** Finished training ***") + saved_model_dir = self.model_folder+'/'+'saved_model' + if not os.path.exists(saved_model_dir): + os.makedirs(saved_model_dir) + self.model.save(saved_model_dir) + + def extract_numerical_data(data): + """Strips out forecast time and identifier columns.""" + return data[[ + col for col in data.columns + if col not in {"forecast_time", "identifier"} + ]] + + #p50_loss = utils.numpy_normalised_quantile_loss( + # extract_numerical_data(targets), extract_numerical_data(p50_forecast), + # 0.5) + #p90_loss = utils.numpy_normalised_quantile_loss( + # extract_numerical_data(targets), extract_numerical_data(p90_forecast), + # 0.9) + tf.keras.backend.set_session(default_keras_session) + print("Training completed.".format(dte.datetime.now())) + #===========================Training Process=========================== + + def predict(self, dataset): + if self.model is None: + raise ValueError("model is not fitted yet!") + d_test = dataset.prepare("test", col_set=["feature", "label"]) + d_test = transform_df(d_test) + d_test.loc[:, self.label_col] = get_shifted_label(d_test, shifts=self.label_shift, col_shift=self.label_col) + test = process_qlib_data(d_test, self.expt_name, fillna=True).dropna() + + use_gpu = (True, self.gpu_id) + #===========================Predicting Process=========================== + default_keras_session = tf.keras.backend.get_session() + + # Sets up default params + fixed_params = self.data_formatter.get_experiment_params() + params = self.data_formatter.get_default_model_params() + params = {**params, **fixed_params} + + + print("*** Begin predicting ***") + tf.reset_default_graph() + + with self.tf_graph.as_default(): + tf.keras.backend.set_session(self.sess) + output_map = self.model.predict(test, return_targets=True) + targets = self.data_formatter.format_predictions(output_map["targets"]) + p50_forecast = self.data_formatter.format_predictions(output_map["p50"]) + p90_forecast = self.data_formatter.format_predictions(output_map["p90"]) + tf.keras.backend.set_session(default_keras_session) + + predict = format_score(p90_forecast, 'pred', self.label_shift) + label = format_score(targets, 'label', self.label_shift) + #===========================Predicting Process=========================== + return predict, label + + def finetune(self, dataset: DatasetH): + """ + finetune model + Parameters + ---------- + dataset : DatasetH + dataset for finetuning + """ + pass diff --git a/examples/benchmarks/TFT/workflow_by_code_tft.py b/examples/benchmarks/TFT/workflow_by_code_tft.py new file mode 100644 index 000000000..593ac468f --- /dev/null +++ b/examples/benchmarks/TFT/workflow_by_code_tft.py @@ -0,0 +1,130 @@ + #Copyright (c) Microsoft Corporation. + #Licensed under the MIT License. + +import sys +from pathlib import Path + +import qlib +import pandas as pd +from qlib.config import REG_CN +from qlib.contrib.model.pytorch_lstm import LSTM +from qlib.contrib.data.handler import ALPHA360_Denoise +from qlib.contrib.strategy.strategy import TopkDropoutStrategy +from qlib.contrib.evaluate import ( + backtest as normal_backtest, + risk_analysis, +) +from qlib.utils import exists_qlib_data + +# from qlib.model.learner import train_model +from qlib.utils import init_instance_by_config + +import pickle +from tft import TFTModel + +if __name__ == "__main__": + + # use default data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts"))) + from get_data import GetData + + GetData().qlib_data_cn(target_dir=provider_uri) + + qlib.init(provider_uri=provider_uri, region=REG_CN) + + MARKET = "csi300" + BENCHMARK = "SH000300" + + ################################### + # train model + ################################### + DATA_HANDLER_CONFIG = { + "start_time": "2008-01-01", + "end_time": "2020-08-01", + "fit_start_time": "2008-01-01", + "fit_end_time": "2014-12-31", + "instruments": MARKET, + } + + TRAINER_CONFIG = { + "train_start_time": "2008-01-01", + "train_end_time": "2014-12-31", + "validate_start_time": "2015-01-01", + "validate_end_time": "2016-12-31", + "test_start_time": "2017-01-01", + "test_end_time": "2020-08-01", + } + + task = { + "dataset": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + 'handler': { + "class": "Alpha158", + "module_path": "qlib.contrib.data.handler", + "kwargs": DATA_HANDLER_CONFIG + }, + 'segments': { + 'train': ("2008-01-01", "2014-12-31"), + 'valid': ("2015-01-01", "2016-12-31",), + 'test': ("2017-01-01", "2020-08-01",), + } + } + } + # You shoud record the data in specific sequence + # "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'], + } + + + model = TFTModel() + dataset = init_instance_by_config(task["dataset"]) + model.fit(dataset) + + pred_score, label_score = model.predict(dataset) + + # save pred_score to file + pred_score_path = Path("~/tmp/qlib/pred_score.pkl").expanduser() + pred_score_path.parent.mkdir(exist_ok=True, parents=True) + pred_score.to_pickle(pred_score_path) + + + ################################### + # backtest + ################################### + STRATEGY_CONFIG = { + "topk": 50, + "n_drop": 5, + } + BACKTEST_CONFIG = { + "verbose": False, + "limit_threshold": 0.095, + "account": 100000000, + "benchmark": BENCHMARK, + "deal_price": "close", + "open_cost": 0.0005, + "close_cost": 0.0015, + "min_cost": 5, + } + + # use default strategy + # custom Strategy, refer to: TODO: Strategy API url + strategy = TopkDropoutStrategy(**STRATEGY_CONFIG) + report_normal, positions_normal = normal_backtest(pred_score, strategy=strategy, **BACKTEST_CONFIG) + + ################################### + # analyze + # If need a more detailed analysis, refer to: examples/train_and_bakctest.ipynb + ################################### + analysis = dict() + analysis["excess_return_without_cost"] = risk_analysis(report_normal["return"] - report_normal["bench"]) + analysis["excess_return_with_cost"] = risk_analysis( + report_normal["return"] - report_normal["bench"] - report_normal["cost"] + ) + analysis_df = pd.concat(analysis) # type: pd.DataFrame + print(analysis_df) + +