Commit 5015902e authored by Malthe Kjær Bisbo's avatar Malthe Kjær Bisbo
Browse files

slight changes to descriptors and prior

parent dd3edadb
This source diff could not be displayed because it is too large. You can view the blob instead.
import os
import sys
import numpy as np
from math import erf
from itertools import product
from scipy.spatial.distance import cdist
cwd = sys.argv[1]
cwd = os.getcwd()
class Angular_Fingerprint(object):
""" comparator for construction of angular fingerprints
def __init__(self, atoms, Rc1=4.0, Rc2=4.0, binwidth1=0.1, Nbins2=30, sigma1=0.2, sigma2=0.10, nsigma=4, eta=1, gamma=3, use_angular=True):
""" Set a common cut of radius
self.Rc1 = Rc1
self.Rc2 = Rc2
self.binwidth1 = binwidth1
self.Nbins2 = Nbins2
self.binwidth2 = np.pi / Nbins2
self.sigma1 = sigma1
self.sigma2 = sigma2
self.nsigma = nsigma
self.eta = eta
self.gamma = gamma
self.use_angular = use_angular
self.pbc = atoms.get_pbc()
self.cell = atoms.get_cell()
self.n_atoms = atoms.get_number_of_atoms()
self.num = atoms.get_atomic_numbers()
self.atomic_types = sorted(list(set(self.num)))
self.atomic_count = {type:list(self.num).count(type) for type in self.atomic_types}
self.volume = atoms.get_volume()
self.dim = 3
# parameters for the binning:
self.m1 = self.nsigma*self.sigma1/self.binwidth1 # number of neighbour bins included.
self.smearing_norm1 = erf(1/np.sqrt(2) * self.m1 * self.binwidth1/self.sigma1) # Integral of the included part of the gauss
self.Nbins1 = int(np.ceil(self.Rc1/self.binwidth1))
self.m2 = self.nsigma*self.sigma2/self.binwidth2 # number of neighbour bins included.
self.smearing_norm2 = erf(1/np.sqrt(2) * self.m2 * self.binwidth2/self.sigma2) # Integral of the included part of the gauss
self.binwidth2 = np.pi/Nbins2
Nelements_2body = self.Nbins1
Nelements_3body = self.Nbins2
if use_angular:
self.Nelements = Nelements_2body + Nelements_3body
self.Nelements = Nelements_2body
def get_feature(self, atoms):
cell = self.cell
n_atoms = self.n_atoms
pos = atoms.get_positions().tolist()
num = atoms.get_atomic_numbers()
atomic_count = self.atomic_count
return pos
def __angle(self, vec1, vec2):
Returns angle with convention [0,pi]
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
arg =,vec2)/(norm1*norm2)
# This is added to correct for numerical errors
if arg < -1:
arg = -1.
elif arg > 1:
arg = 1.
return np.arccos(arg), arg
def __f_cutoff(self, r, gamma, Rc):
Polinomial cutoff function in the, with the steepness determined by "gamma"
gamma = 2 resembels the cosine cutoff function.
For large gamma, the function goes towards a step function at Rc.
if not gamma == 0:
return 1 + gamma*(r/Rc)**(gamma+1) - (gamma+1)*(r/Rc)**gamma
return 1
def __f_cutoff_grad(self, r, gamma, Rc):
if not gamma == 0:
return gamma*(gamma+1)/Rc * ((r/Rc)**gamma - (r/Rc)**(gamma-1))
return 0
def angle2_grad(self, RijVec, RikVec):
Rij = np.linalg.norm(RijVec)
Rik = np.linalg.norm(RikVec)
a = RijVec/Rij - RikVec/Rik
b = RijVec/Rij + RikVec/Rik
A = np.linalg.norm(a)
B = np.linalg.norm(b)
D = A/B
RijMat =[:,np.newaxis], RijVec[:,np.newaxis].T)
RikMat =[:,np.newaxis], RikVec[:,np.newaxis].T)
a_grad_j = -1/Rij**3 * RijMat + 1/Rij * np.identity(3)
b_grad_j = a_grad_j
a_sum_j = np.sum(a*a_grad_j, axis=1)
b_sum_j = np.sum(b*b_grad_j, axis=1)
grad_j = 2/(1+D**2) * (1/(A*B) * a_sum_j - A/(B**3) * b_sum_j)
a_grad_k = 1/Rik**3 * RikMat - 1/Rik * np.identity(3)
b_grad_k = -a_grad_k
a_sum_k = np.sum(a*a_grad_k, axis=1)
b_sum_k = np.sum(b*b_grad_k, axis=1)
grad_k = 2/(1+D**2) * (1/(A*B) * a_sum_k - A/(B**3) * b_sum_k)
a_grad_i = -(a_grad_j + a_grad_k)
b_grad_i = -(b_grad_j + b_grad_k)
a_sum_i = np.sum(a*a_grad_i, axis=1)
b_sum_i = np.sum(b*b_grad_i, axis=1)
grad_i = 2/(1+D**2) * (1/(A*B) * a_sum_i - A/(B**3) * b_sum_i)
return grad_i, grad_j, grad_k
from distutils.core import setup
from Cython.Build import cythonize
import numpy
setup(ext_modules=cythonize('featureCalculators/angular_fingerprintFeature_cy.pyx'), include_dirs=[numpy.get_include()])
import numpy as np
from kernel import gauss_kernel, double_gauss_kernel
from descriptor.fingerprint import Fingerprint
from prior.prior import repulsive_prior
class gpr_memory():
def __init__(self, descriptor, prior):
self.energies = None
self.features = None
self.prior_values = None
self.descriptor = descriptor
self.prior = prior
def save_data(self, atoms_list):
energies_save = np.array([a.get_potential_energy() for a in atoms_list])
features_save = descriptor.get_featureMat(atoms_list)
if self.prior is not None:
prior_values_save = np.array([ for a in atoms_list])
def save_energies(self, energies_save):
if self.energies in None:
self.energies = energies_save
self.energies = np.r_[self.energies, energies_save]
def save_features(self, features_save):
if self.features in None:
self.features = features_save
self.features = np.r_[self.features, features_save]
def save_prior_values(self, prior_values_save):
if self.prior_values in None:
self.prior_values = prior_values_save
self.prior_values = np.r_[self.prior_values, prior_values_save]
class gpr():
"""Gaussian Process Regression
......@@ -12,22 +55,37 @@ class gpr():
def __init__(self, descriptor, kernel, prior):
self.descriptor = descriptor
if kernel is None:
kernel = gauss_kernel()
def __init__(self, descriptor=None, kernel='single', prior=None):
if descriptor is None:
self.descriptor = Fingerprint()
self.descriptor = descriptor
if kernel is 'single':
self.kernel = gauss_kernel()
elif kernel is 'double':
self.kernel = double_gauss_kernel()
self.kernel = kernel
self.prior = prior
if prior is None:
self.prior = repulsive_prior()
self.prior = prior
self.memory = gpr_memory(self.descriptor, self.prior)
def predict(self, a):
x = self.descriptor.get_feature(a)
k = self.kernel.kernel_vector(x, self.X)
f = + self.bias + delta
def set_bias(self):
self.bias = np.mean(self.memory.energies)
def train(self):
def optimize_hyperparameters(self):
import numpy as np
from copy import deepcopy
import pdb
import warnings
from operator import itemgetter
from scipy.linalg import cholesky, cho_solve, solve_triangular
from scipy.optimize import fmin_l_bfgs_b
from kernels import clone
class GPR():
Class to calculate similarities between structures based on their feature vectors.
The comparator coresponds to the choice of kernel for the model
Class to calculate the features of structures based on the atomic positions of the structures.
Regularization parameter for the model
Parameters for the compator. This could be the width for the gaussian kernel.
def __init__(self, kernel, featureCalculator, delta_function=None, bias_func=None, optimize=True, n_restarts_optimizer=0, constraint_small_kernel_width=False):
self.kernel = kernel
self.featureCalculator = featureCalculator
self.optimize = optimize
self.n_restarts_optimizer = n_restarts_optimizer
self.constraint_small_kernel_width = constraint_small_kernel_width
self.bias_func = bias_func
self.delta_function = delta_function
# Initialize data counter
self.Ndata = 0
def predict_energy(self, atoms=None, fnew=None, K_vec=None, delta_value=None, return_error=False, no_prior_on_error=False):
Predict the energy of a new structure.
if K_vec is None:
if fnew is None:
fnew = self.featureCalculator.get_feature(atoms)
K_vec = self.kernel_.get_kernel(self.featureMat, fnew).reshape(-1)
if delta_value is None:
if self.delta_function is not None:
delta =
delta = 0
delta = delta_value
Epred = + self.bias + delta
if return_error:
v = cho_solve((self.L_, True), K_vec) # Line 5
K0 = self.kernel_.get_kernel(fnew,fnew)
E_std = np.sqrt(K0 - # Line 6
#E_std = np.sqrt(self.kernel_(fnew) - # Line 6
return Epred, E_std, K0
alpha_err =, K_vec)
K0 = self.kernel_.get_kernel(fnew,fnew)
g = K0 -, alpha_err)
g = max(0,g) # Handle numerical errors.
E_std = np.sqrt(g)
return Epred, E_std, K0
return Epred
def predict_force(self, atoms=None, fnew=None, fgrad=None, return_error=False):
Predict the force of a new structure.
# Calculate features and their gradients if not given
if fnew is None:
fnew = self.featureCalculator.get_feature(atoms)
if fgrad is None:
fgrad = self.featureCalculator.get_featureGradient(atoms)
dk_df = self.kernel_.get_kernel_jac(self.featureMat, fnew)
# Calculate contribution from delta-function
if self.delta_function is not None:
delta_force = self.delta_function.forces(atoms)
Ncoord = 3 * atoms.get_number_of_atoms()
delta_force = np.zeros(Ncoord)
kernelDeriv =, fgrad.T)
F = -(kernelDeriv.T).dot(self.alpha) + delta_force
if return_error:
K_vec = self.kernel_.get_kernel(self.featureMat, fnew)
kernelDeriv =, fgrad.T)
v = cho_solve((self.L_, True), K_vec) # Line 5
error_force = np.sqrt(self.kernel_.get_kernel_jac(fnew,fnew) - # Line 6
K_vec = self.kernel_.get_kernel(self.featureMat, fnew).reshape(-1)
alpha_err =, K_vec)
K0 = self.kernel_.get_kernel(fnew,fnew)
g = K0 -, alpha_err) # negative g can occur due to numerical errors.
if g <= 0:
print('negative g-value: g={}'.format(g))
error_force = 0
error_force = -1/np.sqrt(g) * (kernelDeriv.T).dot(alpha_err)
return F, error_force
return F
def save_data(self, data_values_save, featureMat_save, delta_values_save=None, add_new_data=False):
Adds data to previously saved data.
Nsave = len(data_values_save)
if Nsave > 0:
if add_new_data and self.Ndata > 0:
# Add data
self.data_values = np.r_[self.data_values, data_values_save]
self.featureMat = np.r_[self.featureMat, featureMat_save]
if self.delta_function is not None:
self.delta_values = np.r_[self.delta_values, delta_values_save]
self.delta_values = np.zeros(1)
# Iterate data counter
self.Ndata += Nsave
# Initialize data objects
self.Ndata = len(data_values_save)
self.data_values = data_values_save
self.featureMat = featureMat_save
if self.delta_function is not None:
self.delta_values = delta_values_save
self.delta_values = np.zeros(1)
def calc_bias(self, y):
if self.bias_func is not None:
if callable(self.bias_func):
bias = self.bias_func(y)
bias = self.bias_func
bias = np.mean(y)
return bias
def train(self, atoms_list=None, data_values=None, features=None, delta_values=None, add_new_data=True, optimize=None, comm=None):
Train the model using gridsearch and cross-validation
--- Input ---
The labels of the new training data. In our case, the energies of the new training structures.
The features of the new training structures.
The atomic positions of the new training structures.
If True, the data passed will be added to previously saved data (if any).
Performs k-fold cross-validation.
Dict containing the sequences of the kernel-width and regularization parameter to be
used in grissearch. The labels are 'sigma' and 'reg' respectively.
if features is None:
features = self.featureCalculator.get_featureMat(atoms_list)
if data_values is None:
data_values = np.array([atoms.get_potential_energy() for atoms in atoms_list])
if delta_values is None:
if self.delta_function is not None:
delta_values = np.array([ for a in atoms_list])
self.bias = self.calc_bias(self.data_values - self.delta_values)
self.y_train = self.data_values - self.delta_values - self.bias
if optimize is None:
optimize = self.optimize
if comm is not None:
master = comm.rank == 0
master = False
self.kernel_ = clone(self.kernel_)
self.kernel_ = clone(self.kernel)
if optimize and self.kernel_.n_dims > 0:
# Choose hyperparameters based on maximizing the log-marginal
# likelihood (potentially starting from several initial values)
def obj_func(theta, eval_gradient=True):
if eval_gradient:
lml, grad = self.log_marginal_likelihood(
theta, eval_gradient=True)
return -lml, -grad
return -self.log_marginal_likelihood(theta)
# First optimize starting from theta specified in kernel
if comm is None or master:
optima = [(self._constrained_optimization(obj_func,
optima = []
# Additional runs are performed from log-uniform chosen initial
# theta
if self.n_restarts_optimizer > 0:
if not np.isfinite(self.kernel_.bounds).all():
raise ValueError(
"Multiple optimizer restarts (n_restarts_optimizer>0) "
"requires that all bounds are finite.")
bounds = self.kernel_.bounds
for iteration in range(self.n_restarts_optimizer - master):
theta_initial = \
np.random.uniform(bounds[:, 0], bounds[:, 1])
if self.constraint_small_kernel_width:
theta_initial[4] = np.random.uniform(bounds[4,0], theta_initial[2])
new_optimum = self._constrained_optimization(obj_func, theta_initial, bounds)
if self.constraint_small_kernel_width:
if new_optimum[0][4] < new_optimum[0][2]+1:
# Select result from run with minimal (negative) log-marginal
# likelihood
lml_values = list(map(itemgetter(1), optima))
self.kernel_.theta = optima[np.argmin(lml_values)][0]
self.log_marginal_likelihood_value_ = -np.min(lml_values)
#print('lml:', self.log_marginal_likelihood_value_, 'kernel_best:', self.kernel_)
self.log_marginal_likelihood_value_ = \
if comm is not None:
hyperparam_results = [self.log_marginal_likelihood_value_, self.kernel_.theta]
hyperparam_results_all = comm.gather(hyperparam_results, root=0)
if master:
lml_all = np.array([result[0] for result in hyperparam_results_all])
index_best_theta = np.argmax(lml_all)
results_best = hyperparam_results_all[index_best_theta]
results_best = None
results_best = comm.bcast(results_best, root=0)
self.kernel_.theta = results_best[1]
self.log_marginal_likelihood_value_ = results_best[0]
# Precompute quantities required for predictions which are independent
# of actual query points
K = self.kernel_(self.featureMat)
self.L_ = cholesky(K, lower=True) # Line 2
L_inv = np.linalg.inv(self.L_)
self.K_inv = L_inv.T @ L_inv
except np.linalg.LinAlgError as exc:
exc.args = ("The kernel, %s, is not returning a "
"positive definite matrix. Try gradually "
"increasing the 'alpha' parameter of your "
"GaussianProcessRegressor estimator."
% self.kernel_,) + exc.args
self.alpha = cho_solve((self.L_, True), self.y_train) # Line 3
if np.any(np.isnan(self.alpha)):
print('alpha:\n', self.alpha, flush=True)
return 0,0
#return self
def log_marginal_likelihood(self, theta=None, eval_gradient=False):
"""Returns log-marginal likelihood of theta for training data.
theta : array-like, shape = (n_kernel_params,) or None
Kernel hyperparameters for which the log-marginal likelihood is
evaluated. If None, the precomputed log_marginal_likelihood
of ``self.kernel_.theta`` is returned.
eval_gradient : bool, default: False
If True, the gradient of the log-marginal likelihood with respect
to the kernel hyperparameters at position theta is returned
additionally. If True, theta must not be None.
log_likelihood : float
Log-marginal likelihood of theta for training data.
log_likelihood_gradient : array, shape = (n_kernel_params,), optional
Gradient of the log-marginal likelihood with respect to the kernel
hyperparameters at position theta.
Only returned when eval_gradient is True.
if theta is None:
if eval_gradient:
raise ValueError(
"Gradient can only be evaluated for theta!=None")
return self.log_marginal_likelihood_value_
kernel = self.kernel_.clone_with_theta(theta)
if eval_gradient:
K, K_gradient = kernel(self.featureMat, eval_gradient=True)
K = kernel(self.featureMat)
L = cholesky(K, lower=True) # Line 2
except np.linalg.LinAlgError:
return (-np.inf, np.zeros_like(theta)) \
if eval_gradient else -np.inf
# Support multi-dimensional output of self.y_train_
y_train = self.y_train
if y_train.ndim == 1:
y_train = y_train[:, np.newaxis]