Commit cb9ee645 authored by Malthe Kjær Bisbo's avatar Malthe Kjær Bisbo
Browse files

update

parent 5015902e
import numpy as np
from kernel import gauss_kernel, double_gauss_kernel
from scipy.linalg import cholesky, cho_solve, solve_triangular
from scipy.optimize import minimize
from kernel import gauss_kernel, double_gauss_kernel
from descriptor.fingerprint import Fingerprint
from prior.prior import repulsive_prior
import warnings
class gpr_memory():
def __init__(self, descriptor, prior):
self.energies = None
self.features = None
self.prior_values = None
self.descriptor = descriptor
self.prior = prior
def save_data(self, atoms_list):
energies_save = np.array([a.get_potential_energy() for a in atoms_list])
self.save_energies(energies_save)
features_save = descriptor.get_featureMat(atoms_list)
self.save_features(features_save)
self.initialize_data()
if self.prior is not None:
prior_values_save = np.array([self.prior.energy(a) for a in atoms_list])
self.save_prior_values(prior_values_save)
def initialize_data(self):
self.energies = None
self.features = None
self.prior_values = None
def get_data(self):
return self.energies, self.features, self.prior_values
def save_data(self, atoms_list, add_data=True):
if not add_data:
self.initialize_data()
self.save_energies(atoms_list)
self.save_features(atoms_list)
self.save_prior_values(atoms_list)
def save_energies(self, energies_save):
if self.energies in None:
def save_energies(self, atoms_list):
energies_save = np.array([a.get_potential_energy() for a in atoms_list])
if self.energies is None:
self.energies = energies_save
else:
self.energies = np.r_[self.energies, energies_save]
def save_features(self, features_save):
if self.features in None:
def save_features(self, atoms_list):
features_save = self.descriptor.get_featureMat(atoms_list)
if self.features is None:
self.features = features_save
else:
else:
self.features = np.r_[self.features, features_save]
def save_prior_values(self, prior_values_save):
if self.prior_values in None:
self.prior_values = prior_values_save
else:
self.prior_values = np.r_[self.prior_values, prior_values_save]
def save_prior_values(self, atoms_list):
if self.prior is not None:
prior_values_save = np.array([self.prior.energy(a) for a in atoms_list])
if self.prior_values is None:
self.prior_values = prior_values_save
else:
self.prior_values = np.r_[self.prior_values, prior_values_save]
else:
self.prior_values = 0
class gpr_memory_test():
def __init__(self, descriptor, prior):
self.initialize_data()
def initialize_data(self):
self.E = None
self.X = None
self.prior_values = None
def get_data(self):
return self.E, self.X, self.prior_values
def save_data(self, atoms_list, add_data=True):
if not add_data:
self.initialize_data()
self.save_energies(atoms_list)
self.save_features(atoms_list)
self.save_prior_values(atoms_list)
def save_energies(self, atoms_list):
E_save = np.array([a.get_potential_energy() for a in atoms_list])
if self.E is None:
self.E = E_save
else:
self.E = np.r_[self.E, E_save]
def save_features(self, atoms_list):
X_save = self.descriptor.get_featureMat(atoms_list)
if self.X is None:
self.X = X_save
else:
self.X = np.r_[self.X, X_save]
def save_prior_values(self, atoms_list):
if self.prior is not None:
prior_values_save = np.array([self.prior.energy(a) for a in atoms_list])
if self.prior_values is None:
self.prior_values = prior_values_save
else:
self.prior_values = np.r_[self.prior_values, prior_values_save]
else:
self.prior_values = 0
class gpr():
......@@ -55,7 +111,7 @@ class gpr():
prior:
"""
def __init__(self, descriptor=None, kernel='single', prior=None):
def __init__(self, descriptor=None, kernel='double', prior=None, n_restarts_optimizer=1):
if descriptor is None:
self.descriptor = Fingerprint()
else:
......@@ -73,31 +129,150 @@ class gpr():
else:
self.prior = prior
self.n_restarts_optimizer = n_restarts_optimizer
self.memory = gpr_memory(self.descriptor, self.prior)
def predict(self, a):
def predict_energy(self, a, eval_std=False):
x = self.descriptor.get_feature(a)
k = self.kernel.kernel_vector(x, self.X)
f = k.T.dot(self.alpha) + self.bias + delta
E = np.dot(k,self.alpha) + self.bias + self.prior.energy(a)
if eval_std:
# Lines 5 and 6 in GPML
vk = np.dot(self.K_inv, k)
K0 = self.kernel.kernel(x,x)
E_std = np.sqrt(K0 - np.dot(k, vk))
return E, E_std
else:
return E
def predict_forces(self, a, eval_std=False):
# Calculate descriptor and its gradient
x = self.descriptor.get_feature(a)
x_ddr = self.descriptor.get_featureGradient(a).T
# Calculate kernel and its derivative
k_ddx = self.kernel.kernel_jacobian(x, self.X)
k_ddr = np.dot(k_ddx, x_ddr)
F = -np.dot(k_ddr.T, self.alpha) + self.prior.forces(a)
if eval_std:
k = self.kernel.kernel_vector(x, self.X)
vk = np.dot(self.K_inv, k)
K0 = self.kernel.kernel_value(x,x)
g = K0 - np.dot(k.T, vk)
assert g >= 0
F_std = -1/np.sqrt(g) * np.dot(k_ddr.T, vk)
return F, F_std
else:
return F
def set_bias(self):
self.bias = np.mean(self.memory.energies)
self.bias = np.mean(self.memory.energies - self.memory.prior_values)
def train(self, atoms_list=None):
if atoms_list is not None:
self.memory.save_data(atoms_list)
self.set_bias()
self.E, self.X, self.prior_values = self.memory.get_data()
self.Y = self.E - self.prior_values - self.bias
def train(self):
K = self.kernel(self.X)
L = cholesky(K, lower=True)
self.alpha = cho_solve((L, True), self.Y)
self.K_inv = cho_solve((L, True), np.eye(K.shape[0]))
def optimize_hyperparameters(self, atoms_list=None, comm=None):
if self.n_restarts_optimizer == 0:
self.train(atoms_list)
def optimize_hyperparameters(self):
pass
if atoms_list is not None:
self.memory.save_data(atoms_list)
self.set_bias()
self.E, self.X, self.prior_values = self.memory.get_data()
self.Y = self.E - self.prior_values - self.bias
def neg_log_likelihood(self):
pass
results = []
for i in range(self.n_restarts_optimizer):
#if i == 0 and comm.rank == 0:
# theta_initial = self.kernel.theta
#else
theta_initial = np.random.uniform(self.kernel.theta_bounds[:, 0],
self.kernel.theta_bounds[:, 1])
res = self.constrained_optimization(theta_initial)
results.append(res)
index_min = np.argmin(np.array([r[1] for r in results]))
res_min = results[index_min]
if comm is not None:
pass
self.kernel.theta = res_min[0]
self.lml = -res_min[1]
self.train()
def neg_log_marginal_likelihood(self, theta=None, eval_gradient=True):
if theta is not None:
self.kernel.theta = theta
def func(self, x):
dsafas
if eval_gradient:
K, K_gradient = self.kernel(self.X, eval_gradient)
else:
K = self.kernel(self.X)
def name():
"""
L = cholesky(K, lower=True)
alpha = cho_solve((L, True), self.Y)
#log_likelihood = -0.5 * np.einsum("i,i", Y, alpha)
lml = -0.5 * np.dot(self.Y, alpha)
lml -= np.sum(np.log(np.diag(L)))
lml -= K.shape[0]/2 * np.log(2*np.pi)
if eval_gradient:
# Equation (5.9) in GPML
K_inv = cho_solve((L, True), np.eye(K.shape[0]))
tmp = np.einsum("i,j->ij", alpha, alpha) - K_inv
lml_gradient = 0.5*np.einsum("ij,kij->k", tmp, K_gradient)
return -lml, -lml_gradient
else:
return -lml
def constrained_optimization(self, theta_initial):
result = minimize(self.neg_log_marginal_likelihood,
x0=theta_initial,
bounds=self.kernel.theta_bounds,
method='L-BFGS-B',
jac=True,
options={'gtol': 1e-2, 'ftol': 1e-4})
if not result.success:
warnings.warn(f"L_BFGS_B terminated with state: {result.message}")
return result.x, result.fun
def numerical_neg_lml(self, dx=1e-4):
N_data = self.X.shape[0]
theta = np.copy(self.kernel.theta)
N_hyper = len(theta)
lml_ddTheta = np.zeros((N_hyper))
for i in range(N_hyper):
theta_up = np.copy(theta)
theta_down = np.copy(theta)
theta_up[i] += 0.5*dx
theta_down[i] -= 0.5*dx
d_theta = np.exp(theta_up[i])-np.exp(theta_down[i])
lml_up = self.neg_log_marginal_likelihood(theta_up, eval_gradient=False)
lml_down = self.neg_log_marginal_likelihood(theta_down, eval_gradient=False)
#lml_ddTheta[i] = (lml_up - lml_down)/dx
lml_ddTheta[i] = (lml_up - lml_down)/d_theta
return lml_ddTheta
def get_calculator():
pass
......@@ -65,9 +65,9 @@ class kernel(ABC):
theta_down[i] -= 0.5*dx
self.theta = theta_up
K_up = self.kernel(X,X)
K_up = self(X, eval_gradient=False)
self.theta = theta_down
K_down = self.kernel(X,X)
K_down = self(X, eval_gradient=False)
dK_dTheta[i,:,:] = (K_up - K_down)/dx
return dK_dTheta
......@@ -161,26 +161,42 @@ class double_gauss_kernel(kernel):
length_scale1=10.0, length_scale1_bounds=(1e0, 1e3),
length_scale2=10.0, length_scale2_bounds=(1e0, 1e3),
weight=0.01, weight_bounds=(0.01,0.01),
noise=3e-3, noise_bounds=(3e-3,3e-3)):
noise=1e-5, noise_bounds=(1e-5,1e-5)):
self.amplitude = amplitude
self.length_scale1 = length_scale1
self.length_scale2 = length_scale2
self.weight = weight
self.noise = noise
self.amplitude_bounds = amplitude_bounds
self.length_scale1_bounds = length_scale1_bounds
self.length_scale2_bounds = length_scale2_bounds
self.weight_bounds = weight_bounds
self.noise_bounds = noise_bounds
self._theta_bounds = [amplitude_bounds, length_scale1_bounds, length_scale2_bounds, weight_bounds]
self.theta_bounds = np.array([amplitude_bounds, length_scale1_bounds, length_scale2_bounds, weight_bounds, noise_bounds])
def __call__(self, X,Y, eval_gradient=False):
pass
def __call__(self, X, eval_gradient=False):
if np.ndim(X) == 1:
X = X.reshape((1,-1))
d1 = cdist(X / self.length_scale1,
X / self.length_scale1, metric='sqeuclidean')
d2 = cdist(X / self.length_scale2,
X / self.length_scale2, metric='sqeuclidean')
K = self.amplitude * (np.exp(-0.5 * d1) + self.weight*np.exp(-0.5 * d2) + self.noise*np.eye(X.shape[0]))
if eval_gradient:
K_gradient = self.kernel_hyperparameter_gradient(X)
return K, K_gradient
else:
return K
def kernel(self, X,Y):
def kernel(self, X,Y=None):
if np.ndim(X) == 1:
X = X.reshape((1,-1))
if np.ndim(Y) == 1:
if Y is None:
Y = X
elif np.ndim(Y) == 1:
Y = Y.reshape((1,-1))
d1 = cdist(X / self.length_scale1,
Y / self.length_scale1, metric='sqeuclidean')
......@@ -189,10 +205,12 @@ class double_gauss_kernel(kernel):
K = self.amplitude * (np.exp(-0.5 * d1) + self.weight*np.exp(-0.5 * d2))
return K
def kernel_value(self, x,y):
K = self.kernel(x,y)
return np.asscalar(K)
def kernel_vector(self, x,Y):
d = cdist(x.reshape(1,-1) / self.length_scale,
Y / self.length_scale, metric='sqeuclidean')
K = np.exp(-0.5 * d)
K = self.kernel(x,Y).reshape(-1)
return K
def kernel_matrix(self, X,Y=None):
......@@ -224,50 +242,53 @@ class double_gauss_kernel(kernel):
@property
def theta(self):
self._theta = [self.amplitude, self.length_scale1, self.length_scale2, self.weight]
return self._theta
"""Returns the log-transformed hyperparameters of the kernel.
"""
self._theta = np.array([self.amplitude, self.length_scale1, self.length_scale2, self.weight, self.noise])
return np.log(self._theta)
@theta.setter
def theta(self, theta):
self._theta = theta
"""Sets the hyperparameters of the kernel.
theta: log-transformed hyperparameters
"""
self._theta = np.exp(theta)
self.amplitude = self._theta[0]
self.length_scale1 = self._theta[1]
self.length_scale2 = self._theta[2]
self.weight = self._theta[3]
self.noise = self._theta[4]
def dK_da(self, X):
if np.ndim(X) == 1:
X = X.reshape((1,-1))
d1 = cdist(X / self.length_scale1,
X / self.length_scale1, metric='sqeuclidean')
d2 = cdist(X / self.length_scale2,
X / self.length_scale2, metric='sqeuclidean')
dK_da = np.exp(-0.5 * d1) + self.weight*np.exp(-0.5 * d2)
dK_da = np.exp(-0.5 * d1) + self.weight*np.exp(-0.5 * d2) + self.noise*np.eye(X.shape[0])
return dK_da
def dK_dl1(self, X):
if np.ndim(X) == 1:
X = X.reshape((1,-1))
d1 = cdist(X / self.length_scale1,
X / self.length_scale1, metric='sqeuclidean')
dK_dl1 = self.amplitude*d1/self.length_scale1*np.exp(-0.5 * d1)
return dK_dl1
def dK_dl2(self, X):
if np.ndim(X) == 1:
X = X.reshape((1,-1))
d2 = cdist(X / self.length_scale2,
X / self.length_scale2, metric='sqeuclidean')
dK_dl2 = self.amplitude*self.weight*d2/self.length_scale2*np.exp(-0.5 * d2)
return dK_dl2
def dK_dw(self, X):
if np.ndim(X) == 1:
X = X.reshape((1,-1))
d2 = cdist(X / self.length_scale2,
X / self.length_scale2, metric='sqeuclidean')
dK_dl2 = self.amplitude*np.exp(-0.5 * d2)
return dK_dl2
def dK_dn(self, X):
dK_dn = self.amplitude * np.eye(X.shape[0])
return dK_dn
def kernel_hyperparameter_gradient(self, X):
return np.array([self.dK_da(X), self.dK_dl1(X), self.dK_dl2(X), self.dK_dw(X)])
return np.array([self.dK_da(X), self.dK_dl1(X), self.dK_dl2(X), self.dK_dw(X), self.dK_dn(X)])
No preview for this file type
import numpy as np
class prior():
def __init__(self):
pass
def prior(self):
pass
This diff is collapsed.
......@@ -46,7 +46,7 @@ def convert_atom_types(num):
class repulsive_prior():
def __init__(self, rcut=5, ratio_of_covalent_radii=0.7):
def __init__(self, rcut=6, ratio_of_covalent_radii=0.7):
self.rcut = rcut
self.ratio_of_covalent_radii = ratio_of_covalent_radii
self.initialized = False
......
......@@ -5,7 +5,7 @@ from gpr import gpr
from ase.io import read
# old gpr
from gaussComparator import gaussComparator
from kernels import RBF, ConstantKernel as C, WhiteKernel
from featureCalculators_multi.angular_fingerprintFeature_cy import Angular_Fingerprint
from delta_functions_multi.delta import delta as deltaFunc
from GPR import GPR
......@@ -32,7 +32,7 @@ def initialize_old_gpr(atoms):
featureCalculator = Angular_Fingerprint(atoms, Rc1=Rc1, Rc2=Rc2, binwidth1=binwidth1, Nbins2=Nbins2, sigma1=sigma1, sigma2=sigma2, gamma=gamma, eta=eta, use_angular=use_angular)
kernel = C(10, (1e1, 1e6)) * (C(1, (1, 1)) * RBF(10, (1,1000)) + C(0.01, (0.01, 0.01)) * RBF(10, (1,1000)) + WhiteKernel(1e-5, (1e-6,1e-2)))
delta = deltaFunc(atoms=a, rcut=6)
delta = deltaFunc(atoms=atoms, rcut=6)
gpr = GPR(kernel=kernel,
featureCalculator=featureCalculator,
delta_function=delta,
......@@ -42,7 +42,13 @@ def initialize_old_gpr(atoms):
return gpr
class test_kernel(unittest.TestCase):
def get_E_with_std(traj, gpr):
E = []
F = []
for a in traj:
e = gpr.predict_energy(a, )
class test_gpr(unittest.TestCase):
@classmethod
def setUpClass(cls):
......@@ -62,13 +68,65 @@ class test_kernel(unittest.TestCase):
def tearDown(self):
print('tearDown\n')
def test_compare_with_old(self):
def test_compare_training_with_old(self):
traj = read('structures.traj', index=':50')
traj_train = traj[:40]
traj_predict = traj[40:]
self.gpr_old.train(traj_train, optimize=False)
self.gpr.train(traj_train)
np.testing.assert_almost_equal(self.gpr.alpha, self.gpr_old.alpha)
E_old = np.array([self.gpr_old.predict_energy(a, return_error=True)[:2] for a in traj_predict])
E = np.array([self.gpr.predict_energy(a, eval_std=True) for a in traj_predict])
np.testing.assert_almost_equal(E, E_old)
F_old = np.array([self.gpr_old.predict_force(a) for a in traj_predict])
F = np.array([self.gpr.predict_forces(a) for a in traj_predict])
np.testing.assert_almost_equal(F, F_old)
Fstd_old = np.array([self.gpr_old.predict_force(a, return_error=True)[1] for a in traj_predict])
Fstd = np.array([self.gpr.predict_forces(a, eval_std=True)[1] for a in traj_predict])
np.testing.assert_almost_equal(Fstd, Fstd_old)
def test_compare_lml_with_old(self):
traj = read('structures.traj', index=':50')
traj_train = traj[:40]
traj_predict = traj[40:]
self.gpr.
self.gpr_old.train(traj_train)
self.gpr.train(traj_train)
lml_old = self.gpr_old.log_marginal_likelihood_value_
lml_new = -self.gpr.neg_log_marginal_likelihood(eval_gradient=False)
np.testing.assert_almost_equal(lml_new, lml_old)
def test_compare_lml_gradient_with_old(self):
traj = read('structures.traj', index=':50')
traj_train = traj[:40]
traj_predict = traj[40:]
self.gpr_old.train(traj_train)
self.gpr.train(traj_train)
_, lml_ddTheta_old = self.gpr_old.log_marginal_likelihood(self.gpr_old.kernel.theta, eval_gradient=True)
_, lml_ddTheta = self.gpr.neg_log_marginal_likelihood(eval_gradient=True)
np.testing.assert_almost_equal(lml_ddTheta, lml_ddTheta_old)
def test_lml_gradient(self):
traj = read('structures.traj', index=':50')
traj_train = traj[:40]
traj_predict = traj[40:]
self.gpr_old.train(traj_train)
self.gpr.train(traj_train)
_, lml_ddTheta = self.gpr.neg_log_marginal_likelihood(eval_gradient=True)
lml_ddTheta_numeric = self.gpr.numerical_neg_lml()
np.testing.assert_almost_equal(lml_ddTheta, lml_ddTheta_numeric)
def test_forces(self):
pass
......
......@@ -36,9 +36,10 @@ class test_kernel(unittest.TestCase):
x1 = np.array([1.,2.,3.])
x2 = np.array([1.5,2.,1.])
x3 = np.array([3.,1.,1.])
X = np.c_[x1,x2,x3].T
np.testing.assert_almost_equal(self.kernel.kernel_hyperparameter_gradient(X),
self.kernel.numerical_hyperparameter_gradient(X))
X = np.c_[x1,x2].T
K_ddtheta = self.kernel.kernel_hyperparameter_gradient(X)
K_ddtheta_old = self.kernel.numerical_hyperparameter_gradient(X)
np.testing.assert_almost_equal(K_ddtheta, K_ddtheta_old)