Source code for tools.neural_embedder

"""
neural_embedder.py
====================================
Module to turn categorial features into embeddings.
"""

import numpy as np
import pandas as pd
import pickle
import tensorflow as tf
import utility.embedder as utility
from typing import List, Tuple
from pandas.api.types import is_string_dtype, is_numeric_dtype
from matplotlib.figure import Figure
from pathlib import Path

[docs]class NetworkCategory: """ Used to store fields related to a given category, such as its name, count of unique values and the size of each embedding layer. """ def __init__(self, alias: str, unique_values: int): self.alias = alias self.unique_values = unique_values self.embedding_size = self.get_embedding_size(unique_values)
[docs] def get_embedding_size(self, unique_values: int) -> int: """ Return the embedding size to be used on the Embedding layer. :param unique_values: the number of unique values in the given category :return: the size to be used on the embedding layer """ size = int(min(np.ceil(unique_values / 2), 50)) if size < 2: return 2 else: return size
[docs]class NeuralEmbedder: """ A neural embedder that can learn entity embeddings from categorial features. """ def __init__(self, df: pd.DataFrame, target_name: str, metrics: List[str], train_ratio: float = 0.8, network_layers: List[int] = (32, 32), dropout_rate: float = 0, activation_fn: str = "relu", kernel_initializer: str = "glorot_uniform", regularization_factor: float = 0, loss_fn: str ='binary_crossentropy', optimizer_fn: str = 'Adam', epochs: int = 10, batch_size: int = 32, verbose: bool = False, model_path: str = 'models'): utility.check_not_empty_dataframe(df) utility.check_target_name(target_name) utility.check_target_existent_in_df(target_name, df) utility.check_train_ratio(train_ratio) utility.check_epochs(epochs) utility.check_batch_size(batch_size) self.df = df self.target_name = target_name self.train_ratio = train_ratio self.epochs = epochs self.batch_size = batch_size self.network_layers = network_layers self.dropout_rate = dropout_rate self.activation_fn = activation_fn self.kernel_initializer = kernel_initializer self.regularization_factor = regularization_factor self.loss_fn = loss_fn self.optimizer_fn = optimizer_fn self.metrics = metrics self.verbose = verbose self.model_path = model_path self.unique_classes = self.df[self.target_name].nunique() self.embedded_categories = self.__get_categorial_cols(df, target_name) self.numerical_categories = self.__get_numerical_cols(df, target_name) self.DEFAULT_WEIGHTS_FILENAME = 'weights.pkl' self.DEFAULT_LABELS_FILENAME = 'labels.pkl' self.DEFAULT_SCALER_FILENAME = 'scaler.pkl' self.DEFAULT_PATH_VISUALIZATIONS = 'visualizations' self.model = self.__make_model() def __get_categorial_cols(self, df: pd.DataFrame, label_name: str) -> List: """ Returns a list of the categories from a given pandas DataFrame, with the exception of the provided target name. :param df: the dataframe :param label_name: the name of the label column to not be included :return: a List of Category with the df columns except the provided one """ cat_list = [] for category in df: if not category == label_name and is_string_dtype(df[category]): cat_list.append(NetworkCategory(category, df[category].nunique())) return cat_list def __get_numerical_cols(self, df: pd.DataFrame, label_name:str) -> List: """ Generates a list of numerial categories from a dataframe :param df: the dataframe :param label_name: name of the label :return: a List of numerial columns """ num_list = [] for category in df: if not category == label_name and is_numeric_dtype(df[category]): num_list.append(category) return num_list def __make_model(self) -> tf.keras.Model: """ This method is used to generate our Model containing the Embedding layers alongside with the output layers :return: a compiled Model object """ model_inputs, model_outputs = self._make_embedding_layers() output_model = self._make_hidden_layers(model_outputs, self.network_layers, self.dropout_rate, self.activation_fn, self.kernel_initializer, self.regularization_factor) output_model = self._make_final_layer(output_model) model = tf.keras.Model(inputs=model_inputs, outputs=output_model) model = self._compile_model(model, self.loss_fn, self.optimizer_fn, self.metrics) return model def _make_hidden_layers(self, outputs: List[tf.keras.layers.Layer], network_layers: List[int], dropout_rate: int, activation_fn: str, kernel_initializer: str, regularization_factor: float) -> tf.keras.layers.Layer: """ This method constructs the hidden layers of neural model :param outputs: the output of previous layer :param network_layers: a List with sizes of Dense units :param dropout_rate: dropout rate to use :param activation_fn: activation function to use :param kernel_initializer: kernel initializer to use :param regularization_factor: regularization factor to use :return: a concatenated stack of tf.keras layers """ output_model = tf.keras.layers.Concatenate()(outputs) for _, layers in enumerate(network_layers): if regularization_factor: output_model = tf.keras.layers.Dense(layers, kernel_initializer=kernel_initializer, kernel_regularizer=tf.keras.regularizers.l2( regularization_factor), bias_regularizer=tf.keras.regularizers.l2( regularization_factor))(output_model) else: output_model = tf.keras.layers.Dense(layers, kernel_initializer=kernel_initializer)(output_model) output_model = tf.keras.layers.Activation(activation_fn)(output_model) if dropout_rate: output_model = tf.keras.layers.AlphaDropout(dropout_rate)(output_model) return output_model def _make_embedding_layers(self) -> Tuple[List[tf.keras.layers.Layer], List[tf.keras.layers.Layer]]: """ This method is used to generate the list of inputs and output layers where our Embedding layers will be placed :return: a tuple containing the input layers and the the output layers """ embedding_inputs = [] embedding_outputs = [] for category in self.embedded_categories: input_category = tf.keras.layers.Input(shape=(1,)) output_category = tf.keras.layers.Embedding(input_dim=category.unique_values, output_dim=category.embedding_size, name=category.alias)(input_category) output_category = tf.keras.layers.Reshape(target_shape=(category.embedding_size, ))(output_category) embedding_inputs.append(input_category) embedding_outputs.append(output_category) return embedding_inputs, embedding_outputs def _make_numerical_layers(self, kernel_initializer: str, activation_fn: str) -> Tuple[List[tf.keras.layers.Layer], List[tf.keras.layers.Layer]]: """ This method constructs the numerical layers of the model :param kernel_initializer: kernel initializer to use :param activation_fn: activation function to use :return: a tuple containing the input layers and the the output layers """ numerical_inputs = [] numerical_outputs = [] for category in self.numerical_categories: input_category = tf.keras.layers.Input(shape=(1,)) output_category = tf.keras.layers.Dense(1, name=category, kernel_initializer=kernel_initializer, activation=activation_fn)(input_category) numerical_inputs.append(input_category) numerical_outputs.append(output_category) return numerical_inputs, numerical_outputs def _make_final_layer(self, previous_layer: tf.keras.layers.Layer) -> tf.keras.layers.Layer: """ This method constructs the final softmax layer :param previous_layer: the previous layer :return: the final model """ output_model = tf.keras.layers.Dense(1)(previous_layer) output_model = tf.keras.layers.Activation('sigmoid')(output_model) return output_model def _compile_model(self, model: tf.keras.Model, loss_fn: str, optimizer_fn: str, metrics: List[str]) -> tf.keras.Model: """ This method compiles the model :param model: the tf.keras Model :param loss_fn: loss function to use :param optimizer_fn: optimizer function to use :param metrics: metrics to use :return: the compiled model """ model.compile(loss=loss_fn, optimizer=optimizer_fn, metrics=metrics) return model
[docs] def fit(self, X_train: np.ndarray, y_train: np.ndarray, X_valid: np.ndarray, y_valid: np.ndarray, callbacks = None, class_weight: dict = None) -> tf.keras.callbacks.History: """ This method is used to fit a given training and validation data into our entity embeddings model :param X_train: training features :param y_train: training targets :param X_valid: validation features :param y_valid: validation targets :param callbacks: any desired callbacks :param class_weight: any desired class weight :return a History object """ history = self.model.fit(x=utility.transpose_to_list(X_train), y=y_train, validation_data=(utility.transpose_to_list(X_valid), y_valid), epochs=self.epochs, batch_size=self.batch_size, callbacks=callbacks, class_weight=class_weight, verbose=self.verbose) return history
[docs] def save_model(self) -> None: """ This method saves the current model :return: None """ self.model.save(self.model_path)
[docs] def get_embedded_weights(self) -> List: """ This method extracts the weights of the embedded layers :return: a List with embedded weights """ embedded_weights = [] if self.numerical_categories is not None: columns_to_skip = self.numerical_categories + [self.target_name] else: columns_to_skip = [self.target_name] for column in utility.get_all_columns_except(self.df, columns_to_skip): weights = self._get_weights_from_layer(column) embedded_weights.append(weights) return embedded_weights
def _get_weights_from_layer(self, layer_name: str) -> List[np.array]: """ This method extracts the weights of the embedded layers :param layer_name: name of the layer :return: a List with embedded weights """ return self.model.get_layer(layer_name).get_weights()[0]
[docs] def get_weights_path(self) -> Path: """ Used to return the path of the stored weights :return: the pah of the stored weights on disk """ return Path.joinpath(self.model_path, self.DEFAULT_WEIGHTS_FILENAME)
[docs] def get_labels_path(self) -> Path: """ Used to return the path of the stored labels :return: the pah of the stored labels on disk """ return Path.joinpath(self.model_path, self.DEFAULT_LABELS_FILENAME)
[docs] def get_scaler_path(self) -> Path: """ Used to return the path of the stored scaler :return: the pah of the stored scaler on disk """ return Path.joinpath(self.model_path, self.DEFAULT_SCALER_FILENAME)
[docs] def get_visualizations_dir(self) -> Path: """ Used to return the path of the stored visualizations :return: the pah of the stored visualizations on disk """ return Path.joinpath(self.model_path, self.DEFAULT_PATH_VISUALIZATIONS)
[docs] def save_weights(self, weights: List) -> None: """ This method saves a list of weights :param weights: weights to save :return: None """ with open(self.get_weights_path(), 'wb') as f: pickle.dump(weights, f, -1)
[docs] def save_labels(self, labels: List) -> None: """ This method saves a list of labels :param labels: labels to save :return: None """ with open(self.get_labels_path(), 'wb') as f: pickle.dump(labels, f, -1)
[docs] def make_visualizations_from_network(self, extension: str = 'pdf') -> List[Figure]: """ This method makes visualizations of the embedded weights for each categorial variable :param extension: extension to use :return: a List with Figure objects """ with open(self.get_labels_path(), 'rb') as f: labels = pickle.load(f) with open(self.get_weights_path(), 'rb') as f: embeddings = pickle.load(f) n_numerical_cols = len(self.numerical_categories) return utility.make_visualizations(labels, embeddings, self.df, self.get_visualizations_dir(), extension, n_numerical_cols)