Commit 950f302d authored by thecml's avatar thecml
Browse files

added tests for file reader and writer

parent 5e3615d8
Pipeline #94167 failed with stage
in 4 minutes and 22 seconds
......@@ -31,37 +31,46 @@ def prepare_data(X, y, settings):
def load_data_embedded(case, settings):
if case == "Complete":
dl = data_loader.CompleteDataLoader("complete_emb.csv", settings).load_data()
dl = data_loader.CompleteDataLoader(pt.PROCESSED_DATA_DIR,
"complete_emb.csv", settings).load_data()
X, y = dl.get_data()
elif case == "Compliance":
dl = data_loader.ComplianceDataLoader("compliance_emb.csv", settings).load_data()
dl = data_loader.ComplianceDataLoader(pt.PROCESSED_DATA_DIR,
"compliance_emb.csv", settings).load_data()
X, y = dl.get_data()
else:
dl = data_loader.FallDataLoader("fall_emb.csv", settings).load_data()
dl = data_loader.FallDataLoader(pt.PROCESSED_DATA_DIR,
"fall_emb.csv", settings).load_data()
X, y = dl.get_data()
return X, y
def load_data_count(case, settings):
if case == "Complete":
dl = data_loader.CompleteDataLoader("complete_count.csv", settings).load_data()
dl = data_loader.CompleteDataLoader(pt.PROCESSED_DATA_DIR,
"complete_count.csv", settings).load_data()
X, y = dl.get_data()
elif case == "Compliance":
dl = data_loader.ComplianceDataLoader("compliance_count.csv", settings).load_data()
dl = data_loader.ComplianceDataLoader(pt.PROCESSED_DATA_DIR,
"compliance_count.csv", settings).load_data()
X, y = dl.get_data()
else:
dl = data_loader.FallDataLoader("fall_count.csv", settings).load_data()
dl = data_loader.FallDataLoader(pt.PROCESSED_DATA_DIR,
"fall_count.csv", settings).load_data()
X, y = dl.get_data()
return X, y
def load_data_ohe(case, settings):
if case == "Complete":
dl = data_loader.CompleteDataLoader("complete_ohe.csv", settings).load_data()
dl = data_loader.CompleteDataLoader(pt.PROCESSED_DATA_DIR,
"complete_ohe.csv", settings).load_data()
X, y = dl.get_data()
elif case == "Compliance":
dl = data_loader.ComplianceDataLoader("compliance_ohe.csv", settings).load_data()
dl = data_loader.ComplianceDataLoader(pt.PROCESSED_DATA_DIR,
"compliance_ohe.csv", settings).load_data()
X, y = dl.get_data()
else:
dl = data_loader.FallDataLoader("fall_ohe.csv", settings).load_data()
dl = data_loader.FallDataLoader(pt.PROCESSED_DATA_DIR,
"fall_ohe.csv", settings).load_data()
X, y = dl.get_data()
return X, y
......@@ -71,8 +80,8 @@ def main():
metrics = ['accuracy', 'precision', 'recall', 'roc_auc', 'average_precision', 'f1']
cases = ["Complete", "Compliance", "Fall"]
for case in cases:
target_settings = load_settings(f'{case.lower()}.yaml')
data_settings = load_settings("data.yaml")
target_settings = load_settings(pt.CONFIGS_DIR, f'{case.lower()}.yaml')
data_settings = load_settings(pt.CONFIGS_DIR, "data.yaml")
output_filename = f"{case} model baseline.csv"
header = ['clf', 'version', 'accuracy_mean', 'accuracy_std',
'precision_mean', 'precision_std', 'recall_mean',
......
......@@ -9,6 +9,8 @@ from pathlib import Path
from sklearn.decomposition import PCA
import tensorflow as tf
from sklearn.model_selection import StratifiedKFold
from io import StringIO
import shutil
USE_CROSS_VALID = False
USE_GROUPING = False
......@@ -16,7 +18,7 @@ ENABLE_EMB_VIZ = False
def main(ats_resolution: int = None):
for label_name in ["Complete", "Compliance", "Fall"]:
data_settings = load_settings('data.yaml')
data_settings = load_settings(pt.CONFIGS_DIR, 'data.yaml')
if ats_resolution == None:
ats_resolution = data_settings['ats_resolution']
......@@ -31,7 +33,7 @@ def main(ats_resolution: int = None):
ats_cols = [str(i)+'Ats' for i in range(1, ats_resolution+1)]
df = df.drop(ats_cols, axis=1)
target_settings = load_settings(f'{label_name.lower()}.yaml')
target_settings = load_settings(pt.CONFIGS_DIR, f'{label_name.lower()}.yaml')
model_path = Path.joinpath(pt.ROOT_DIR, target_settings['model_path'])
df_enc = encode_dataframe(df=df_to_enc,
target_name=target_settings['target_name'],
......@@ -44,8 +46,15 @@ def main(ats_resolution: int = None):
model_path=model_path)
df = pd.concat([df.drop(label_name, axis=1), df_enc, df.pop(label_name)], axis=1)
file_writer.write_csv(df, pt.PROCESSED_DATA_DIR, f'{label_name.lower()}_emb.csv')
outfile = StringIO()
file_path = pt.PROCESSED_DATA_DIR
file_name = f'{label_name.lower()}_emb.csv'
with open(Path.joinpath(file_path, file_name), 'w', newline='') as fd:
file_writer.write_csv(df, outfile)
outfile.seek(0)
shutil.copyfileobj(outfile, fd)
def encode_dataframe(df, target_name, batch_size, train_ratio, epochs,
optimizer, network_layers, verbose, model_path):
X_train, X_val, y_train, y_val, labels = preprocessor.prepare_data_for_embedder(df,
......@@ -75,65 +84,15 @@ def encode_dataframe(df, target_name, batch_size, train_ratio, epochs,
Y = pca.fit_transform(embeddings_column)
y_array = np.concatenate(Y)
mapping = dict(zip(labels_column.classes_, y_array))
file_writer.write_mapping(mapping,
Path.joinpath(pt.PROCESSED_DATA_DIR, 'embeddings'),
f'{target_name.lower()}_{column}.csv')
df_to_enc[column] = df_to_enc[column].replace(to_replace=mapping)
return df_to_enc
def encode_dataframe_cv(df, target_name, batch_size, train_ratio,
epochs, network_layers, verbose, model_path):
X, y = preprocessor.get_X_y(df, target_name)
X, labels = preprocessor.encode_vector_label(X)
y = np.array(y)
network = neural_embedder.NeuralEmbedder(df=df, target_name=target_name,
epochs=epochs,
batch_size=batch_size,
network_layers=network_layers,
verbose=verbose, model_path=model_path)
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
mode='min',
patience=3,
verbose=0)
weights = []
for train_index, valid_index in skf.split(X, y):
X_train, X_valid = X[train_index,:], X[valid_index,:]
y_train, y_valid = y[train_index], y[valid_index]
_ = network.fit(X_train, y_train, X_valid, y_valid,
callbacks=[es_callback])
embedded_weights = network.get_embedded_weights()
weights.append(embedded_weights)
new_weights = list()
for weights_list_tuple in zip(*weights):
new_weights.append(
[np.array(weights_).mean(axis=0)\
for weights_ in zip(*weights_list_tuple)])
network.save_weights(new_weights)
network.save_labels(labels)
outfile = StringIO()
file_path = pt.PROCESSED_EMB_DATA_DIR
file_name = f'{target_name.lower()}_{column}.csv'
with open(Path.joinpath(file_path, file_name), 'w', newline='', encoding='utf8') as fd:
file_writer.write_embedding(mapping, outfile)
outfile.seek(0)
shutil.copyfileobj(outfile, fd)
if ENABLE_EMB_VIZ:
network.make_visualizations_from_network(extension='png')
df_to_enc = df.drop(target_name, axis=1)
for index in range(df_to_enc.shape[1]):
column = df_to_enc.columns[index]
labels_column = labels[index]
embeddings_column = new_weights[index]
pca = PCA(n_components=1)
Y = pca.fit_transform(embeddings_column)
y_array = np.concatenate(Y)
mapping = dict(zip(labels_column.classes_, y_array))
file_writer.write_mapping(mapping,
Path.joinpath(pt.PROCESSED_DATA_DIR, 'embeddings'),
f'{target_name.lower()}_{column}.csv')
df_to_enc[column] = df_to_enc[column].replace(to_replace=mapping)
return df_to_enc
......
......@@ -4,19 +4,22 @@ from tools import file_reader, file_writer, labeler
from tools import preprocessor
from utility.settings import load_settings
import pandas as pd
from io import StringIO
import shutil
from pathlib import Path
def main(ats_resolution: int = None):
screenings = file_reader.read_csv(pt.INTERIM_DATA_DIR, 'screenings.csv',
converters={'CitizenId': str})
data_settings = load_settings('data.yaml')
data_settings = load_settings(pt.CONFIGS_DIR, 'data.yaml')
if ats_resolution == None:
ats_resolution = data_settings['ats_resolution']
df = screenings.copy()
accum_screenings = labeler.accumulate_screenings(df, data_settings)
for label_name in ['Complete', 'Compliance', 'Fall']:
target_settings = load_settings(f'{label_name.lower()}.yaml')
target_settings = load_settings(pt.CONFIGS_DIR, f'{label_name.lower()}.yaml')
features = target_settings['features']
# Encode target label
......@@ -39,8 +42,14 @@ def main(ats_resolution: int = None):
ats = file_reader.read_csv(pt.REFERENCES_DIR, 'ats.csv',
converters={'ats_id': str})
df = preprocessor.replace_cat_values(df, ats)
file_writer.write_csv(df, pt.PROCESSED_DATA_DIR, f'{label_name.lower()}.csv')
outfile = StringIO()
file_path = pt.PROCESSED_DATA_DIR
file_name = f'{label_name.lower()}.csv'
with open(Path.joinpath(file_path, file_name), 'w', newline='') as fd:
file_writer.write_csv(df, outfile)
outfile.seek(0)
shutil.copyfileobj(outfile, fd)
if __name__ == "__main__":
main()
\ No newline at end of file
......@@ -10,7 +10,7 @@ import yaml
def main():
df = file_reader.read_pickle(pt.INTERIM_DATA_DIR, 'ats.pkl')
settings = load_settings("data.yaml")
settings = load_settings(pt.CONFIGS_DIR, "data.yaml")
ats_iso_length = settings['ats_iso_length']
df['DevISOClass'] = df['DevISOClass'].apply(lambda x: x[:ats_iso_length]) # limit ats iso length
......@@ -45,7 +45,7 @@ def main():
df = df[df.groupby('CitizenId').IsAlarmLend.transform(lambda s: s.ne(1).cumprod().astype(bool))]
# Make features
lends = df[['CitizenId', 'DevISOClass', 'LendDate', 'ReturnDate']]
lends = df[['CitizenId', 'DevISOClass', 'LendDate', 'ReturnDate']].copy(deep=True)
lends['LendDiff'] = lends['LendDate'] - lends['ReturnDate']
loan_period = lends.groupby('CitizenId')['LendDiff'].apply(
lambda x: abs(x.mean().total_seconds()) // (24 * 3600)).reset_index()
......@@ -116,9 +116,9 @@ def main():
dtype=[('Status', 'bool'), ('Days_to_alarm', '>i4')])
data_x = preprocessor.split_cat_columns(x_df, col_to_split='Ats', tag='Ats',
resolution=ats_resolution)
file_writer.write_array(data_y, pt.PROCESSED_DATA_DIR, "alarm_labels.npy")
file_writer.write_csv(data_x, pt.PROCESSED_DATA_DIR, "alarm_features.csv")
data_dict = {'x': data_x, 'y': data_y}
file_writer.write_pickle(data_dict, pt.PROCESSED_DATA_DIR, "alarm_data.pkl")
if __name__ == "__main__":
main()
\ No newline at end of file
......@@ -10,12 +10,11 @@ from sksurv.ensemble import RandomSurvivalForest
USE_LABEL_ENC = True
def main():
settings = load_settings("data.yaml")
settings = load_settings(pt.CONFIGS_DIR, "data.yaml")
ats_resolution = settings['ats_resolution']
converters = {str(i)+'Ats':str for i in range(1, ats_resolution+1)}
data_x = file_reader.read_csv(pt.PROCESSED_DATA_DIR, "alarm_features.csv",
converters=converters)
data_y = file_reader.read_array(pt.PROCESSED_DATA_DIR, "alarm_labels.npy")
data = file_reader.read_pickle(pt.PROCESSED_DATA_DIR, "alarm_data.pkl")
data_x = data['x']
data_y = data['y']
if USE_LABEL_ENC:
labels_enc = dict()
......
......@@ -20,12 +20,11 @@ CONFIGS_DIR = Path.joinpath(ROOT_DIR, 'configs')
REFERENCES_DIR = Path.joinpath(ROOT_DIR, 'references')
REPORTS_DIR = Path.joinpath(ROOT_DIR, 'reports')
REPORTS_PLOTS_DIR = Path.joinpath(ROOT_DIR, 'reports/plots')
LOGS_DIR = Path.joinpath(ROOT_DIR, 'src/logs')
CONFIG_DIR = Path.joinpath(ROOT_DIR, 'src/cfg')
TESTS_FILES_DIR = Path.joinpath(ROOT_DIR, 'tests/files')
RAW_DATA_DIR_2019 = Path.joinpath(ROOT_DIR, 'data/raw/2019')
RAW_DATA_DIR_2020 = Path.joinpath(ROOT_DIR, 'data/raw/2020')
PROCESSED_DATA_DIR = Path.joinpath(ROOT_DIR, 'data/processed')
PROCESSED_EMB_DATA_DIR = Path.joinpath(ROOT_DIR, 'data/processed/embeddings')
INTERIM_DATA_DIR = Path.joinpath(ROOT_DIR, 'data/interim')
EXTERNAL_DATA_DIR = Path.joinpath(ROOT_DIR, 'data/external')
CLUSTERS_DIR = Path.joinpath(ROOT_DIR, 'models/clusters')
......
......@@ -8,12 +8,19 @@ from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from scipy.stats import skew, boxcox
from typing import Tuple, List
from pathlib import Path
from io import BytesIO, StringIO
import shutil
class BaseDataLoader(ABC):
def __init__(self, file_name: str,
def __init__(self,
file_path: Path,
file_name: str,
settings,
converters=None):
"""Initilizer method that takes a file name, settings and optionally a converter"""
"""Initilizer method that takes a file path, file name,
settings and optionally a converter"""
self.file_path = file_path
self.file_name = file_name
self.settings = settings
self.converters = converters
......@@ -80,9 +87,12 @@ class BaseDataLoader(ABC):
class CompleteDataLoader(BaseDataLoader):
def load_data(self):
df = file_reader.read_csv(pt.PROCESSED_DATA_DIR,
self.file_name,
converters=self.converters)
infile = StringIO()
with open(Path.joinpath(self.file_path,
self.file_name), 'r', newline='') as fd:
shutil.copyfileobj(fd, infile)
infile.seek(0)
df = file_reader.read_csv(infile, converters=self.converters)
X = df.drop(['Complete'], axis=1)
y = df['Complete']
self.X = X
......@@ -91,33 +101,41 @@ class CompleteDataLoader(BaseDataLoader):
class ComplianceDataLoader(BaseDataLoader):
def load_data(self):
df = file_reader.read_csv(pt.PROCESSED_DATA_DIR,
self.file_name,
converters=self.converters)
infile = StringIO()
with open(Path.joinpath(self.file_path,
self.file_name), 'r', newline='') as fd:
shutil.copyfileobj(fd, infile)
infile.seek(0)
df = file_reader.read_csv(infile, converters=self.converters)
X = df.drop(['Compliance'], axis=1)
y = df['Compliance']
self.X = X
self.y = y
return self
class AlarmDataLoader(BaseDataLoader):
class FallDataLoader(BaseDataLoader):
def load_data(self):
df = file_reader.read_csv(pt.PROCESSED_DATA_DIR,
self.file_name,
converters=self.converters)
X = df.drop(['Alarm'], axis=1)
y = df['Alarm']
infile = StringIO()
with open(Path.joinpath(self.file_path,
self.file_name), 'r', newline='') as fd:
shutil.copyfileobj(fd, infile)
infile.seek(0)
df = file_reader.read_csv(infile, converters=self.converters)
X = df.drop(['Fall'], axis=1)
y = df['Fall']
self.X = X
self.y = y
return self
class FallDataLoader(BaseDataLoader):
class AlarmDataLoader(BaseDataLoader):
def load_data(self):
df = file_reader.read_csv(pt.PROCESSED_DATA_DIR,
self.file_name,
converters=self.converters)
X = df.drop(['Fall'], axis=1)
y = df['Fall']
infile = BytesIO()
with open(Path.joinpath(self.file_path, self.file_name), 'rb') as fd:
shutil.copyfileobj(fd, infile)
infile.seek(0)
data = file_reader.read_pickle(infile)
X = data['x']
y = data['y']
self.X = X
self.y = y
return self
\ No newline at end of file
......@@ -5,66 +5,64 @@ import numpy as np
import joblib
import pickle
import csv
from io import StringIO, BytesIO
def read_embedding(file_path: Path, file_name: str) -> dict:
def read_csv(infile: StringIO, header: str='infer',
sep: str=',', usecols: List[int]=None,
names: List[str]=None, converters: dict=None) -> pd.DataFrame:
"""
This method reads a csv file using Pandas read_csv() method
:param file_path: path of the file
:param file_name: name of the file
:param header: file header
:param sep: seperator identifier
:param names: list of column names to use
:param converters: dict of converters to use
:return: the csv file
"""
return pd.read_csv(infile, header=header, sep=sep, usecols=usecols,
names=names, converters=converters)
def read_embedding(infile: StringIO) -> dict:
"""
This method reads an embedding file from disk
:param file_path: the path of the file
:param file_name: the name of the file
:return: the embedding as a dict
"""
with open(Path.joinpath(file_path, file_name), 'r') as f:
reader = csv.reader(f)
embedding_dict = {rows[0]:rows[1] for rows in reader}
reader = csv.reader(infile)
embedding_dict = {rows[0]:rows[1] for rows in reader}
return embedding_dict
def read_array(file_path: Path, file_name: str) -> np.ndarray:
def read_array(infile: BytesIO) -> np.ndarray:
"""
This method reads an NumPy array file as a pickle from disk
:param file_path: the path of the file
:param file_name: the name of the file
:return: the NumPy array object
"""
return np.load(Path.joinpath(file_path, file_name))
return np.load(infile)
def read_pickle(file_path: Path, file_name: str) -> any:
def read_pickle(infile: BytesIO) -> any:
"""
This method reads any file stored as a pickle
:param file_path: the path of the file
:param file_name: the name of the file
:return: the file object
"""
with open(Path.joinpath(file_path, file_name), 'rb') as f:
data = pickle.load(f)
data = pickle.load(infile)
return data
def read_joblib(file_path: Path, file_name: str) -> any:
def read_joblib(infile: BytesIO) -> any:
"""
This method reads a joblib file from disk
:param file_path: the path of the file
:param file_name: the name of the file
:return: the joblib file
"""
return joblib.load(Path.joinpath(file_path, file_name))
return joblib.load(infile)
def read_csv(file_path: Path, file_name: str, header: str='infer',
sep: str=',', names: List[str]=None,
converters: dict=None) -> pd.DataFrame:
"""
This method reads a csv file using Pandas read_csv() method
:param file_path: path of the file
:param file_name: name of the file
:param header: file header
:param sep: seperator identifier
:param names: list of column names to use
:param converters: dict of converters to use
:return: the csv file
"""
return pd.read_csv(Path.joinpath(file_path, file_name), header=header,
sep=sep, names=names, converters=converters)
def read_excelfile(file_path: Path, file_name: str,
converters: dict=None) -> pd.DataFrame:
def read_excelfile(infile: BytesIO, converters: dict=None) -> pd.DataFrame:
"""
This method reads an excel file from disk
:param file_path: path of the file
......@@ -72,22 +70,22 @@ def read_excelfile(file_path: Path, file_name: str,
:param converters: dict of converters to use
:return: the excel file as a dataframe
"""
df = pd.read_excel(Path.joinpath(file_path, file_name),
df = pd.read_excel(infile,
engine='openpyxl',
converters=converters)
return df
def read_excelfile_sheets(file_path: Path, file_name: str, n_sheets: int,
def read_excelfile_sheets(infile: BytesIO, n_sheets: int,
converters: dict=None) -> pd.DataFrame:
"""
This method reads sheets from an excel file from disk
This method reads sheets from an excel file from disk
:param file_path: path of the file
:param file_name: name of the file
:param n_sheets: number of sheets to read
:param converters: dict of converters to use
:return: the full excel file as a dataframe
"""
file = pd.ExcelFile(Path.joinpath(file_path, file_name), engine='openpyxl')
file = pd.ExcelFile(infile, engine='openpyxl')
full_file = pd.DataFrame()
for i in range(n_sheets):
df = file.parse(file.sheet_names[i], converters=converters)
......
import os
from pathlib import Path
import pandas as pd
from typing import List
import matplotlib.pyplot as plt
......@@ -10,10 +9,10 @@ import itertools
import pickle
from sklearn.metrics import roc_curve, confusion_matrix
from utility.metrics import annotate_heatmap
from io import StringIO, BytesIO
def write_csv(df: pd.DataFrame,
file_path: Path,
file_name: str,
outfile: StringIO,
date_format: str='%d-%m-%Y',
index: bool=False) -> None:
"""
......@@ -25,11 +24,9 @@ def write_csv(df: pd.DataFrame,
:param index: write row names (index)
:return: None
"""
df.to_csv(Path.joinpath(file_path, file_name),
date_format=date_format,
index=index)
df.to_csv(outfile, date_format=date_format, index=index)
def write_mapping(mapping: dict, file_path: Path, file_name: str) -> None:
def write_embedding(mapping: dict, outfile: StringIO) -> None:
"""
This method writes an embedding mapping to disk as a csv file
:param mapping: mapping dict
......@@ -38,15 +35,12 @@ def write_mapping(mapping: dict, file_path: Path, file_name: str) -> None:
:return: None
"""
field_names = ['Ats', 'Embedding']
with open(Path.joinpath(file_path, file_name), 'w',
newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(field_names)
for key, value in mapping.items():
writer.writerow([key, value])
f.close()
writer = csv.writer(outfile)
writer.writerow(field_names)
for key, value in mapping.items():
writer.writerow([key, value])
def write_array(data, file_path, file_name) -> None:
def write_array(data: np.ndarray, outfile: BytesIO) -> None:
"""
This method writes an NumPy array to disk
:param data: data to write
......@@ -54,9 +48,9 @@ def write_array(data, file_path, file_name) -> None:
:param file_name: name of the file
:return: None
"""
np.save(Path.joinpath(file_path, file_name), data)
np.save(outfile, data)
def write_pickle(data: any, file_path: Path, file_name: str) -> None:
def write_pickle(data: any, outfile: BytesIO) -> None:
"""
This method writes a pickle file to disk
:param data: data to write
......@@ -64,12 +58,9 @@ def write_pickle(data: any, file_path: Path, file_name: str) -> None:
:param file_name: name of the file
:return: None
"""
with open(Path.joinpath(file_path, file_name), 'wb') as f:
pickle.dump(data, f)
pickle.dump(data, outfile)
def write_joblib(data: any,
file_path: Path,
file_name: str) -> None:
def write_joblib(data: any, outfile: BytesIO) -> None:
"""
This method writes a joblib file to disk
:param data: data to write
......@@ -77,11 +68,12 @@ def write_joblib(data: any,
:param file_name: name of the file
:return: None
"""
joblib.dump(data, Path.joinpath(file_path, file_nam