Source code for src.models.tsvgp

"""
Module for the t-SVGP model
"""

# Copyright Anonymous Authors
# Only for double-blind review. Not to be shared.

# This code has been extended from the SVGP implementation in GPflow and is
# to be released under a compatible license.

import abc

import gpflow
import numpy as np
import tensorflow as tf
from gpflow import kullback_leiblers
from gpflow.conditionals import conditional
from gpflow.config import default_float, default_jitter
from gpflow.covariances import Kuf, Kuu
from gpflow.models.model import GPModel, InputData, MeanAndVariance, RegressionData
from gpflow.models.training_mixins import ExternalDataTrainingLossMixin
from gpflow.models.util import inducingpoint_wrapper

from src.sites import DenseSites
from src.util import (
    conditional_from_precision_sites,
    gradient_transformation_mean_var_to_expectation,
    posterior_from_dense_site,
)


[docs]class base_SVGP(GPModel, ExternalDataTrainingLossMixin, abc.ABC): """ Modified gpflow.svgp.SVGP class to accommodate for different paramaterization of q(u) """ def __init__( self, kernel, likelihood, inducing_variable, *, mean_function=None, num_latent_gps: int = 1, num_data=None, ): """ - kernel, likelihood, inducing_variables, mean_function are appropriate GPflow objects - num_latent_gps is the number of latent processes to use, defaults to 1 - num_data is the total number of observations, defaults to X.shape[0] (relevant when feeding in external minibatches) """ # init the super class, accept args super().__init__(kernel, likelihood, mean_function, num_latent_gps) self.num_data = num_data self.inducing_variable = inducingpoint_wrapper(inducing_variable) @abc.abstractmethod
[docs] def get_mean_chol_cov_inducing_posterior(self): """Returns the mean and cholesky factor of the covariance matrix of q(u)""" raise NotImplementedError
[docs] def prior_kl(self) -> tf.Tensor: """Returns the KL divergence KL[q(u)|p(u)]""" q_mu, q_sqrt = self.get_mean_chol_cov_inducing_posterior() return kullback_leiblers.prior_kl( self.inducing_variable, self.kernel, q_mu, q_sqrt, whiten=False
)
[docs] def maximum_log_likelihood_objective(self, data: RegressionData) -> tf.Tensor: """ The variational lower bound :param data: input data """ return self.elbo(data)
[docs] def elbo(self, data: RegressionData) -> tf.Tensor: """ This gives a variational bound (the evidence lower bound or ELBO) on the log marginal likelihood of the model. :param data: input data """ X, Y = data kl = self.prior_kl() f_mean, f_var = self.predict_f(X, full_cov=False, full_output_cov=False) var_exp = self.likelihood.variational_expectations(f_mean, f_var, Y) if self.num_data is not None: num_data = tf.cast(self.num_data, kl.dtype) minibatch_size = tf.cast(tf.shape(X)[0], kl.dtype) scale = num_data / minibatch_size else: scale = tf.cast(1.0, kl.dtype) return tf.reduce_sum(var_exp) * scale - kl
[docs] def predict_f(self, Xnew: InputData, full_cov=False, full_output_cov=False) -> MeanAndVariance: """ Posterior prediction at new input Xnew :param Xnew: N x D Tensor """ q_mu, q_sqrt = self.get_mean_chol_cov_inducing_posterior() mu, var = conditional( Xnew, self.inducing_variable, self.kernel, q_mu, q_sqrt=q_sqrt, full_cov=full_cov, white=False, full_output_cov=full_output_cov, ) tf.debugging.assert_positive(var) return mu + self.mean_function(Xnew), var
[docs]class t_SVGP(base_SVGP): """ Class for the t-SVGP model """ def __init__( self, kernel, likelihood, inducing_variable, *, mean_function=None, num_latent_gps: int = 1, lambda_1=None, lambda_2_sqrt=None, num_data=None, force=False, ): """ - kernel, likelihood, inducing_variables, mean_function are appropriate GPflow objects - num_latent_gps is the number of latent processes to use, defaults to 1 - q_diag is a boolean. If True, the covariance is approximated by a diagonal matrix. - whiten is a boolean. If True, we use the whitened representation of the inducing points. - num_data is the total number of observations, defaults to X.shape[0] (relevant when feeding in external minibatches) """ # init the super class, accept args GPModel.__init__(self, kernel, likelihood, mean_function, num_latent_gps) self.num_data = num_data self.inducing_variable = inducingpoint_wrapper(inducing_variable) # init variational parameters self.num_inducing = self.inducing_variable.num_inducing self._init_variational_parameters(self.num_inducing, lambda_1, lambda_2_sqrt) self.whiten = False self.force = force
[docs] def _init_variational_parameters(self, num_inducing, lambda_1, lambda_2_sqrt, **kwargs): """ Constructs the site parameters λ₁, Λ₂. for site t(u) = exp(uᵀλ₁ - ½ uᵀΛ₂u) Parameters ---------- :param num_inducing: int Number of inducing variables, typically referred to as M. :param lambda_1: np.array or None First order natural parameter of the variational site. :param lambda_2_sqrt: np.array or None Second order natural parameter of the variational site. """ lambda_1 = np.zeros((num_inducing, self.num_latent_gps)) if lambda_1 is None else lambda_1 if lambda_2_sqrt is None: lambda_2_sqrt = [ -tf.eye(num_inducing, dtype=default_float()) * 1e-10 for _ in range(self.num_latent_gps) ] lambda_2_sqrt = np.array(lambda_2_sqrt) else: assert lambda_2_sqrt.ndim == 3 self.num_latent_gps = lambda_2_sqrt.shape[0] self.sites = DenseSites(lambda_1, lambda_2_sqrt)
@property
[docs] def lambda_1(self): """first natural parameter""" return self.sites.lambda_1
@property
[docs] def lambda_2_sqrt(self): """Cholesky factor of the second natural parameter""" return self.sites.lambda_2_sqrt
@property
[docs] def lambda_2(self): """second natural parameter""" return tf.matmul(self.lambda_2_sqrt, self.lambda_2_sqrt, transpose_b=True)
[docs] def get_mean_chol_cov_inducing_posterior(self): """ Computes the mean and cholesky factor of the posterior on the inducing variables q(u) = 𝓝(u; m, S) S = (K⁻¹ + Λ₂)⁻¹ = (K⁻¹ + L₂L₂ᵀ)⁻¹ = K - KL₂W⁻¹L₂ᵀK , W = (I + L₂ᵀKL₂)⁻¹ m = S λ₁ """ K_uu = Kuu( self.inducing_variable, self.kernel, jitter=default_jitter() ) # [P, M, M] or [M, M] return posterior_from_dense_site(K_uu, self.lambda_1, self.lambda_2_sqrt)
# todo : make broadcastable
[docs] def new_predict_f( self, Xnew: InputData, full_cov=False, full_output_cov=False ) -> MeanAndVariance: """ Posterior prediction at new input Xnew :param Xnew: N x D Tensor """ K_uu = Kuu( self.inducing_variable, self.kernel, jitter=default_jitter() ) # [P, M, M] or [M, M] K_uf = Kuf(self.inducing_variable, self.kernel, Xnew) # [P, M, M] or [M, M] K_ff = self.kernel.K_diag(Xnew)[..., None] mu, var = conditional_from_precision_sites( K_uu, K_ff, K_uf, self.lambda_1, L=self.lambda_2_sqrt ) tf.debugging.assert_positive(var) # We really should make the tests pass with this here return mu + self.mean_function(Xnew), var
[docs] def natgrad_step(self, data, lr=0.1, jitter=1e-9): """Takes natural gradient step in Variational parameters in the local parameters λₜ = rₜ▽[Var_exp] + (1-rₜ)λₜ₋₁ Input: :param: X : N x D :param: Y: N x 1 :param: lr: Scalar Output: Updates the params """ X, Y = data mean, var = self.predict_f(X) # todo : hack to get heterokedastic demo to run if isinstance( self.inducing_variable, gpflow.inducing_variables.SharedIndependentInducingVariables ): meanZ, _ = self.predict_f(self.inducing_variable.inducing_variables[0].Z) else: meanZ, _ = self.predict_f(self.inducing_variable.Z) with tf.GradientTape() as g: g.watch([mean, var]) ve = self.likelihood.variational_expectations(mean, var, Y) grads = g.gradient(ve, [mean, var]) # cropping grads to stay negative eps = 1e-8 grads[1] = tf.minimum(grads[1], -eps * tf.ones_like(grads[1])) Id = tf.eye(self.num_inducing, dtype=tf.float64) # Compute the projection matrix A from prior information K_uu = Kuu(self.inducing_variable, self.kernel) K_uf = Kuf(self.inducing_variable, self.kernel, X) # [P, M, M] or [M, M] chol_Kuu = tf.linalg.cholesky(K_uu + Id * jitter) A = tf.transpose(tf.linalg.cholesky_solve(chol_Kuu, K_uf)) # ▽μ₁[Var_exp] = aₙαₙ , # ▽μ2[Var_exp] = λₙaₙaₙᵀ if tf.rank(A) == 2: A = tf.tile(A[..., None], [1, 1, self.num_latent_gps]) grads = [ tf.einsum("nml,nl->ml", A, grads[0]), tf.einsum("nml,nol,nl->lmo", A, A, grads[1]), ] # chain rule at f grad_mu = gradient_transformation_mean_var_to_expectation(meanZ, grads) if self.num_data is not None: num_data = tf.cast(self.num_data, dtype=tf.float64) minibatch_size = tf.cast(tf.shape(X)[0], dtype=tf.float64) scale = num_data / minibatch_size else: scale = tf.cast(1.0, dtype=tf.float64) lambda_2 = -0.5 * self.lambda_2 lambda_1 = self.lambda_1 # compute update in natural form lambda_1 = (1 - lr) * lambda_1 + lr * scale * grad_mu[0] lambda_2 = (1 - lr) * lambda_2 + lr * scale * grad_mu[1] # transform and perform update lambda_2_sqrt = -tf.linalg.cholesky(-2.0 * lambda_2 + Id * jitter) # To match SVGP you need to eliminate this jitter for minibatching self.lambda_1.assign(lambda_1) self.lambda_2_sqrt.assign(lambda_2_sqrt) self.get_mean_chol_cov_inducing_posterior()