"""
embedder.py
====================================
Utility embedder functions.
"""
import os
import shutil
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import os
import numpy as np
import pandas as pd
from typing import List, Tuple
from matplotlib.figure import Figure
from sklearn.preprocessing import LabelEncoder
from sklearn.decomposition import PCA
from pandas.api.types import is_numeric_dtype
from tools import preprocessor, neural_embedder
from utility.data import write_embedding
import paths as pt
TITLE_FORMAT = 'Weights for %s'
SCATTER_EMBEDDINGS_FORMAT = '%s_embedding.%s'
PLOT_LOSS_FORMAT = 'loss_epochs.%s'
[docs]def encode_dataframe(df: pd.DataFrame,
target_name: str,
metrics: List[str],
batch_size: int,
train_ratio: float,
epochs: int,
optimizer: str,
network_layers: List[int],
verbose: bool,
model_path: str,
enable_emb_viz: bool) -> pd.DataFrame:
"""
Encodes the categorial features of a dataframe as entity embeddings
:param df: dataframe to encode
:param target_name: the label name
:param metrics: a list of metrics to use
:param batch_size: batch size to use
:param train_ratio: the train/test split ratio
:param epochs: number of epochs
:param optimizer: optimizer to use
:param network_layers: a list with sizes of network layers, e.g. (32, 32)
:param verbose: verbose execution flag
:param model_path: where to store the model
:param enable_emb_viz: make viz flag
"""
X_train, X_val, y_train, y_val, labels = preprocessor.prepare_data_for_emb(df,
target_name,
train_ratio)
network = neural_embedder.NeuralEmbedder(df=df,
target_name=target_name,
metrics=metrics,
epochs=epochs, batch_size=batch_size,
network_layers=network_layers,
optimizer_fn=optimizer,
verbose=verbose,
model_path=model_path)
network.fit(X_train, y_train, X_val, y_val)
embedded_weights = network.get_embedded_weights()
if enable_emb_viz:
network.make_visualizations_from_network(extension='png')
df_to_enc = df.drop(target_name, axis=1)
for index in range(df_to_enc.shape[1]):
column = df_to_enc.columns[index]
labels_column = labels[index]
embeddings_column = embedded_weights[index]
pca = PCA(n_components=1)
Y = pca.fit_transform(embeddings_column)
y_array = np.concatenate(Y)
mapping = dict(zip(labels_column.classes_, y_array))
file_path = pt.PROCESSED_EMB_DATA_DIR
file_name = f'{target_name.lower()}_{column}.csv'
write_embedding(mapping, file_path, file_name)
df_to_enc[column] = df_to_enc[column].replace(to_replace=mapping)
return df_to_enc
[docs]def get_embedding_size(unique_values: int) -> int:
"""
Return the embedding size to be used on the Embedding layer
:param unique_values: the number of unique values in the given category
:return: the size to be used on the embedding layer
"""
size = int(min(np.ceil(unique_values / 2), 50))
if size < 2:
return 2
else:
return size
[docs]def get_numerical_cols(df: pd.DataFrame, target_name:str) -> List:
"""
Generates a list of numerial categories from a dataframe
"""
num_list = []
for category in df:
if not category == target_name and is_numeric_dtype(df[category]):
num_list.append(category)
return num_list
def check_weights_output(weights_output: str) -> None:
if not weights_output:
raise ValueError("You should provide a output file for the embeddings weights")
[docs]def series_to_list(series: pd.Series) -> List:
"""
This method is used to convert a given pd.Series object into a list
:param series: the list to be converted
:return: the list containing all the elements from the Series object
"""
list_cols = []
for item in series:
list_cols.append(item)
return list_cols
def sample(X: np.ndarray, y: np.ndarray, n: int) -> Tuple[np.ndarray, np.ndarray]:
num_row = X.shape[0]
indices = np.random.randint(num_row, size=n)
return X[indices, :], y[indices]
[docs]def get_X_y(df: pd.DataFrame, name_target: str) -> Tuple[List, List]:
"""
This method is used to gather the X (features) and y (targets) from a given dataframe based on a given
target name
:param df: the dataframe to be used as source
:param name_target: the name of the target variable
:return: the list of features and targets
"""
X_list = []
y_list = []
for _, record in df.iterrows():
fl = series_to_list(record.drop(name_target))
X_list.append(fl)
y_list.append(int(record[name_target]))
return X_list, y_list
[docs]def transpose_to_list(X: np.ndarray) -> List[np.ndarray]:
"""
:param X: the ndarray to be used as source
:return: a list of nd.array containing the elements from the numpy array
"""
features_list = []
for index in range(X.shape[1]):
features_list.append(X[..., [index]])
return features_list
[docs]def make_plot_from_history(history: tf.keras.callbacks.History,
output_path: str=None,
extension: str='pdf') -> Figure:
"""
Used to make a Figure object containing the loss curve between the epochs.
:param history: the history outputted from the model.fit method
:param output_path: (optional) where the image will be saved
:param extension: (optional) the extension of the file
:return: a Figure object containing the plot
"""
loss = history.history['loss']
fig = plt.figure(figsize=(10, 10))
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.plot(loss)
if output_path:
os.makedirs(output_path, exist_ok=True)
plt.savefig(os.path.join(output_path, PLOT_LOSS_FORMAT % extension))
return fig
[docs]def make_visualizations(labels: List[LabelEncoder],
embeddings: List[np.array],
df: pd.DataFrame,
output_path: str=None,
extension: str='pdf',
n_numerical_cols: int=None) -> List[Figure]:
"""
Used to generate the embedding visualizations for each categorical variable
:param labels: a list of the LabelEncoders of each categorical variable
:param embeddings: a Numpy array containing the weights from the categorical variables
:param n_numerical_cols: number of numerical columns
:param df: the dataframe from where the weights were extracted
:param output_path: (optional) where the visualizations will be saved
:param extension: (optional) the extension to be used when saving the artifacts
:param n_numerical_cols: number of numerical columns
:return: the list of figures for each categorical variable
"""
figures = []
if n_numerical_cols:
embedded_df = df.iloc[:, n_numerical_cols:df.shape[1]-1]
else:
embedded_df = df.iloc[:, 0:df.shape[1]-1]
for index in range(embedded_df.shape[1]):
column = embedded_df.columns[index]
if is_not_single_embedding(labels[index]):
labels_column = labels[index]
embeddings_column = embeddings[index]
pca = PCA(n_components=2)
Y = pca.fit_transform(embeddings_column)
fig = plt.figure(figsize=(10, 10))
figures.append(fig)
plt.scatter(Y[:, 0], Y[:, 1])
plt.title(TITLE_FORMAT % column)
for i, text in enumerate(labels_column.classes_):
plt.annotate(text, (Y[i, 0], Y[i, 1]), xytext=(-20, 10), textcoords='offset points')
if output_path:
os.makedirs(output_path, exist_ok=True)
plt.savefig(os.path.join(output_path, SCATTER_EMBEDDINGS_FORMAT % (column, extension)))
return figures
[docs]def is_not_single_embedding(label: LabelEncoder) -> bool:
"""
Used to check if there is more than one class in a given LabelEncoder
:param label: label encoder to be checked
:return: a boolean if the embedding contains more than one class
"""
return label.classes_.shape[0] > 1
[docs]def get_all_columns_except(df: pd.DataFrame, columns_to_skip: List[str]) -> pd.DataFrame:
"""
Used to get all columns in a dataframe except columns to skip
:param df: dataframe to select columns from
:param columns_to_skip: list of columns to skip
:return: a dataframe with selected columns
"""
return df.loc[:, list(filter(lambda x: x not in columns_to_skip, df.columns))]
[docs]def check_not_empty_dataframe(df: pd.DataFrame) -> None:
"""
Checks if a dataframe is empty
:param df: dataframe to check
"""
if df.empty:
raise ValueError("You should provide a non-empty pandas dataframe")
[docs]def check_target_name(target_name: str) -> None:
"""
Checks if a target name is set
:param target_name: target name to check
"""
if not target_name:
raise ValueError("You should provide a non-empty target name")
[docs]def check_target_existent_in_df(target_name: str, df: pd.DataFrame) -> None:
"""
Checks if a target name exists in a dataframe
:param target_name: target name to check
:param df: dataframe to check
"""
if target_name not in df.columns:
raise ValueError("You should provide a target variable that is existent on the dataframe")
[docs]def check_train_ratio(train_ratio: float) -> None:
"""
Checks a train ratio is between zero and one
:param train_ratio: train ratio to check
"""
if train_ratio <= 0 or train_ratio >= 1:
raise ValueError("You should provide a train ratio greater than 0 and smaller than 1")
[docs]def check_epochs(epochs: int) -> None:
"""
Checks number of epochs is greater than zero
:param epochs: number of epochs to check
"""
if epochs <= 0:
raise ValueError("You should provide a epoch greater than zero")
[docs]def check_batch_size(batch_size: int) -> None:
"""
Checks batch size is greater than zero
:param batch_size: batch size to check
"""
if batch_size <= 0:
raise ValueError("You should provide a batch size greater than zero")