Source code for torch_openreml.reml

"""
Restricted maximum likelihood (REML) estimation.

This module implements REML estimation for linear mixed-effects models
via the average information (AI) algorithm. The AI algorithm combines
the computational efficiency of expected and observed information matrices
to achieve fast, robust convergence for variance component estimation.

Classes:
    REML:
        REML estimator with AI-based optimisation, supporting fixed-effect
        estimation via BLUE, random-effect prediction via BLUP, and
        marginal and conditional prediction.
"""

import torch
from torch_openreml.utils import get_device, get_dtype
from torch_openreml.covariance.matrix import Matrix
from tqdm import tqdm

[docs] class REML: r""" REML estimator for linear mixed-effects models. Fits variance components :math:`\boldsymbol{\theta}` by maximising the restricted log-likelihood .. math:: \ell_R(\boldsymbol{\theta}) = -\frac{1}{2} \left( \log |\symbf{V}(\boldsymbol{\theta})| + \log |\symbf{X}^\top \symbf{V}(\boldsymbol{\theta})^{-1} \symbf{X}| + \symbf{y}^\top \symbf{P} \symbf{y} \right) where :math:`\symbf{P} = \symbf{V}(\boldsymbol{\theta})^{-1} - \symbf{V}(\boldsymbol{\theta})^{-1}\symbf{X} (\symbf{X}^\top\symbf{V}(\boldsymbol{\theta})^{-1}\symbf{X})^{-1}\symbf{X}^\top\symbf{V}(\boldsymbol{\theta})^{-1}` is the projection matrix, and :math:`\symbf{V}(\boldsymbol{\theta}) = \symbf{Z}\symbf{G}(\boldsymbol{\theta})\symbf{Z}^\top + \symbf{R}(\boldsymbol{\theta})` is the mariginal covariance matrix. Optimisation uses the average information (AI) algorithm, which forms a quasi-Newton step :math:`\Delta = \symbf{AI}^{-1} \symbf{s}` from the score vector :math:`\symbf{s}` and the AI matrix at each iteration. The covariance model :math:`\symbf{V}(\boldsymbol{\theta})` is supplied either as a :class:`~torch_openreml.covariance.matrix.Matrix` instance via ``v_builder``, or as a pair of callables ``map_theta_to_v`` and ``map_theta_to_dv``. If ``map_theta_to_dv`` is omitted, the Jacobian is computed automatically via :func:`torch.func.jacrev`. """ def __init__(self, v_builder=None, map_theta_to_v=None, map_theta_to_g=None, map_theta_to_dv=None): """ Initialize a REML estimator. Args: v_builder (Matrix, optional): A :class:`~torch_openreml.covariance.matrix.Matrix` instance whose :meth:`~torch_openreml.covariance.matrix.Matrix.map_theta_to_v` and :meth:`~torch_openreml.covariance.matrix.Matrix.map_theta_to_dv` methods are used to construct :math:`\symbf{V}` and its Jacobian. Takes precedence over ``map_theta_to_v`` and ``map_theta_to_dv`` if both are provided. map_theta_to_v (callable, optional): Maps a flat parameter tensor :math:`\\boldsymbol{\\theta}` to the covariance matrix :math:`\symbf{V}`. Required if ``v_builder`` is not provided. map_theta_to_g (callable, optional): Maps :math:`\\boldsymbol{\\theta}` to the random-effect covariance matrix :math:`\symbf{G}`. Required for :meth:`blup`, :meth:`predict`, and :meth:`residual`. map_theta_to_dv (callable, optional): Maps :math:`\\boldsymbol{\\theta}` to the Jacobian of :math:`\symbf{V}`, a 3D tensor of shape ``(num_params, n, n)``. If ``None`` and ``v_builder`` is not provided, the Jacobian is computed via automatic differentiation. Raises: TypeError: If ``v_builder`` is not a :class:`~torch_openreml.covariance.matrix.Matrix` instance. ValueError: If neither ``v_builder`` nor ``map_theta_to_v`` is provided. Example: .. jupyter-execute:: import torch from torch_openreml import REML from torch_openreml.covariance import ScalarMatrix n, p = 50, 2 y = torch.randn(n) x = torch.randn(n, p) theta = torch.tensor([0.0]) mat = ScalarMatrix(n) reml = REML(v_builder=mat) theta_hat, beta_hat, n_iter = reml.optimize(y, x, theta, verbose=2) theta_hat, beta_hat """ if v_builder is not None and not isinstance(v_builder, Matrix): raise TypeError("'v_builder' must be a Matrix instance or None!") if v_builder is None and map_theta_to_v is None: raise ValueError("At least one of 'v_builder' or 'map_theta_to_v' must be provided!") if v_builder is not None: self.map_theta_to_v = v_builder.map_theta_to_v self.map_theta_to_dv = v_builder.map_theta_to_dv else: self.map_theta_to_v = map_theta_to_v self.jacobian_func = torch.func.jacrev(map_theta_to_v) self.map_theta_to_dv = map_theta_to_dv self.map_theta_to_g = map_theta_to_g
[docs] def blue(self, y, x, theta): r""" Compute the best linear unbiased estimator (BLUE) of fixed effects. Solves for :math:`\hat{\boldsymbol{\beta}}` via generalised least squares: .. math:: \hat{\boldsymbol{\beta}} = (\symbf{X}^\top \symbf{V}(\boldsymbol{\theta})^{-1} \symbf{X})^{-1} \symbf{X}^\top \symbf{V}(\boldsymbol{\theta})^{-1} \symbf{y} Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Design matrix of shape ``(n, p)``. theta (torch.Tensor): Flat variance component parameter tensor. Returns: torch.Tensor: Fixed-effect estimate :math:`\hat{\boldsymbol{\beta}}` of shape ``(p,)``. """ device = get_device(y, x, theta) dtype = get_dtype(y, x, theta) scalar = {} matrix = {} scalar["N"] = y.shape[0] matrix["V"] = self.map_theta_to_v(theta) matrix["V"] = matrix["V"] + 1e-6 * torch.eye(scalar["N"], device=device, dtype=dtype) matrix["L"] = torch.linalg.cholesky(matrix["V"]) matrix["Y"] = y.unsqueeze(-1) matrix["X"] = x matrix["V^{-1} Y"] = torch.cholesky_solve(matrix["Y"], matrix["L"]) matrix["V^{-1} X"] = torch.cholesky_solve(matrix["X"], matrix["L"]) matrix["X^T V^{-1} X"] = matrix["X"].T @ matrix["V^{-1} X"] matrix["X^T V^{-1} Y"] = matrix["X"].T @ matrix["V^{-1} Y"] matrix["L_{X^T V^{-1} X}"] = torch.linalg.cholesky(matrix["X^T V^{-1} X"]) matrix[r"\hat{\beta}"] = torch.cholesky_solve(matrix["X^T V^{-1} Y"], matrix["L_{X^T V^{-1} X}"]) return matrix[r"\hat{\beta}"].squeeze()
[docs] def marginal_predict(self, y, x, theta): r""" Compute marginal fitted values from fixed effects only. .. math:: \hat{\symbf{y}} = \symbf{X} \hat{\boldsymbol{\beta}} Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Design matrix of shape ``(n, p)``. theta (torch.Tensor): Flat variance component parameter tensor. Returns: torch.Tensor: Marginal fitted values of shape ``(n,)``. """ vector = {} vector[r"\hat{\beta}"] = self.blue(y, x, theta) return x @ vector[r"\hat{\beta}"]
[docs] def marginal_residual(self, y, x, theta): r""" Compute marginal residuals from fixed effects only. .. math:: \hat{\symbf{e}} = \symbf{y} - \symbf{X}\hat{\boldsymbol{\beta}} Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Design matrix of shape ``(n, p)``. theta (torch.Tensor): Flat variance component parameter tensor. Returns: torch.Tensor: Marginal residuals of shape ``(n,)``. """ return y - self.marginal_predict(y, x, theta)
[docs] def blup(self, y, x, z, theta, map_theta_to_g=None): r""" Compute the best linear unbiased predictor (BLUP) of random effects. .. math:: \hat{\symbf{u}} = \symbf{G} \symbf{Z}^\top \symbf{V}^{-1}(\boldsymbol{\theta}) \hat{\symbf{e}} where :math:`\hat{\symbf{e}}` are the marginal residuals. Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Fixed-effect design matrix of shape ``(n, p)``. z (torch.Tensor): Random-effect design matrix of shape ``(n, q)``. theta (torch.Tensor): Flat variance component parameter tensor. map_theta_to_g (callable, optional): Maps ``theta`` to the random-effect covariance matrix :math:`\symbf{G}`. Defaults to :attr:`map_theta_to_g` set at initialisation. Returns: torch.Tensor: Random-effect predictions of shape ``(q,)``. """ if map_theta_to_g is None: map_theta_to_g = self.map_theta_to_g device = get_device(y, x, z, theta) dtype = get_dtype(y, x, z, theta) scalar = {} matrix = {} scalar["N"] = y.shape[0] matrix["V"] = self.map_theta_to_v(theta) matrix["V"] = matrix["V"] + 1e-6 * torch.eye(scalar["N"], device=device, dtype=dtype) matrix["L"] = torch.linalg.cholesky(matrix["V"]) matrix["Y"] = y.unsqueeze(-1) matrix["e"] = self.marginal_residual(y, x, theta).unsqueeze(-1) matrix["V^{-1} e"] = torch.cholesky_solve(matrix["e"], matrix["L"]) matrix["G"] = map_theta_to_g(theta) return (matrix["G"] @ (z.T @ matrix["V^{-1} e"])).squeeze()
[docs] def predict(self, y, x, z, theta, map_theta_to_g=None): r""" Compute conditional fitted values including random effects. .. math:: \hat{\symbf{y}} = \symbf{X}\hat{\boldsymbol{\beta}} + \symbf{Z}\hat{\symbf{u}} Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Fixed-effect design matrix of shape ``(n, p)``. z (torch.Tensor): Random-effect design matrix of shape ``(n, q)``. theta (torch.Tensor): Flat variance component parameter tensor. map_theta_to_g (callable, optional): Maps ``theta`` to :math:`\symbf{G}`. Defaults to :attr:`map_theta_to_g` set at initialisation. Returns: torch.Tensor: Conditional fitted values of shape ``(n,)``. """ if map_theta_to_g is None: map_theta_to_g = self.map_theta_to_g device = get_device(y, x, z, theta) dtype = get_dtype(y, x, z, theta) scalar = {} matrix = {} scalar["N"] = y.shape[0] matrix["V"] = self.map_theta_to_v(theta) matrix["V"] = matrix["V"] + 1e-6 * torch.eye(scalar["N"], device=device, dtype=dtype) matrix["L"] = torch.linalg.cholesky(matrix["V"]) matrix["Y"] = y.unsqueeze(-1) matrix["X"] = x matrix["V^{-1} Y"] = torch.cholesky_solve(matrix["Y"], matrix["L"]) matrix["V^{-1} X"] = torch.cholesky_solve(matrix["X"], matrix["L"]) matrix["X^T V^{-1} X"] = matrix["X"].T @ matrix["V^{-1} X"] matrix["X^T V^{-1} Y"] = matrix["X"].T @ matrix["V^{-1} Y"] matrix["L_{X^T V^{-1} X}"] = torch.linalg.cholesky(matrix["X^T V^{-1} X"]) matrix[r"\beta"] = torch.cholesky_solve(matrix["X^T V^{-1} Y"], matrix["L_{X^T V^{-1} X}"]) matrix[r"\hat{Y}"] = matrix["X"] @ matrix[r"\beta"] matrix["e"] = matrix["Y"] - matrix[r"\hat{Y}"] matrix["V^{-1} e"] = torch.cholesky_solve(matrix["e"], matrix["L"]) matrix["G"] = map_theta_to_g(theta) return (matrix[r"\hat{Y}"] + (matrix["G"] @ (z.T @ matrix["V^{-1} e"]))).squeeze()
[docs] def residual(self, y, x, z, theta, map_theta_to_g=None): r""" Compute conditional residuals including random effects. .. math:: \hat{\symbf{e}} = \symbf{y} - \hat{\symbf{y}} where :math:`\hat{\symbf{y}}` is the conditional prediction from :meth:`predict`. Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Fixed-effect design matrix of shape ``(n, p)``. z (torch.Tensor): Random-effect design matrix of shape ``(n, q)``. theta (torch.Tensor): Flat variance component parameter tensor. map_theta_to_g (callable, optional): Maps ``theta`` to :math:`\symbf{G}`. Defaults to :attr:`map_theta_to_g` set at initialisation. Returns: torch.Tensor: Conditional residuals of shape ``(n,)``. """ if map_theta_to_g is None: map_theta_to_g = self.map_theta_to_g return y - self.predict(y, x, z, theta, map_theta_to_g)
[docs] def loglik(self, y, x, theta): r""" Evaluate the REML log-likelihood. .. math:: \ell_R(\boldsymbol{\theta}) = -\frac{1}{2} \left( \log |\symbf{V}(\boldsymbol{\theta})| + \log |\symbf{X}^\top \symbf{V}(\boldsymbol{\theta})^{-1} \symbf{X}| + \symbf{y}^\top \symbf{P} \symbf{y} \right) Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Design matrix of shape ``(n, p)``. theta (torch.Tensor): Flat variance component parameter tensor. Returns: torch.Tensor: Scalar REML log-likelihood value. """ device = get_device(y, x, theta) dtype = get_dtype(y, x, theta) scalar = {} matrix = {} scalar["N"] = y.shape[0] matrix["V"] = self.map_theta_to_v(theta) matrix["V"] = matrix["V"] + 1e-6 * torch.eye(scalar["N"], device=device, dtype=dtype) matrix["L"] = torch.linalg.cholesky(matrix["V"]) matrix["Y"] = y.unsqueeze(-1) matrix["V^{-1} Y"] = torch.cholesky_solve(matrix["Y"], matrix["L"]) matrix["V^{-1} X"] = torch.cholesky_solve(matrix["X"], matrix["L"]) matrix["X^T V^{-1} X"] = matrix["X"].T @ matrix["V^{-1} X"] matrix["X^T V^{-1} Y"] = matrix["X"].T @ matrix["V^{-1} Y"] matrix["L_{X^T V^{-1} X}"] = torch.linalg.cholesky(matrix["X^T V^{-1} X"]) matrix[r"\hat{\beta}"] = torch.cholesky_solve(matrix["X^T V^{-1} Y"], matrix["L_{X^T V^{-1} X}"]) matrix["e"] = matrix["Y"] - matrix["X"] @ matrix[r"\hat{\beta}"] matrix["V^{-1} e"] = torch.cholesky_solve(matrix["e"], matrix["L"]) scalar["log |V|"] = 2.0 * torch.sum(torch.log(torch.diag(matrix["L"]))) scalar["log |X^T V^{-1} X|"] = 2.0 * torch.sum(torch.log(torch.diag(matrix["L_{X^T V^{-1} X}"]))) scalar["Y^T P Y"] = (matrix["e"].T @ matrix["V^{-1} e"]).squeeze() return -0.5 * (scalar["log |V|"] + scalar["log |X^T V^{-1} X|"] + scalar["Y^T P Y"])
[docs] def compute_v_dv(self, theta): r""" Compute the covariance matrix and its Jacobian. Calls :attr:`map_theta_to_v` to build :math:`\symbf{V}(\boldsymbol{\theta})` and either :attr:`map_theta_to_dv` or :func:`torch.func.jacrev` to build the Jacobian :math:`\partial\symbf{V}(\boldsymbol{\theta})/\partial\boldsymbol{\theta}`. Args: theta (torch.Tensor): Flat variance component parameter tensor. Returns: tuple: ``(v, dv)``, where ``v`` is the covariance matrix of shape ``(n, n)`` and ``dv`` is the Jacobian of shape ``(num_params, n, n)``. """ v = self.map_theta_to_v(theta) if self.map_theta_to_dv is None: jacobian = self.jacobian_func(theta) dv = jacobian.permute(2, 0, 1) else: dv = self.map_theta_to_dv(theta) return v, dv
[docs] def ai_step(self, y, x, theta, require_loglik=True, require_beta=True): r""" Perform a single average information (AI) algorithm step. Computes the score vector :math:`\symbf{s}`, AI matrix, and optionally the REML log-likelihood and fixed-effect estimate at the current :math:`\boldsymbol{\theta}`. The score vector and AI matrix are: .. math:: s_k &= \frac{1}{2}\left( \symbf{y}^\top \symbf{P} \frac{\partial\symbf{V}(\boldsymbol{\theta})}{\partial\theta_k} \symbf{P} \symbf{y} - \mathrm{tr}\!\left(\symbf{P} \frac{\partial\symbf{V}(\boldsymbol{\theta})}{\partial\theta_k}\right) \right) \\ \mathrm{AI}_{kj} &= \frac{1}{2} \symbf{y}^\top \symbf{P} \frac{\partial\symbf{V}(\boldsymbol{\theta})}{\partial\theta_k} \symbf{P} \frac{\partial\symbf{V}(\boldsymbol{\theta})}{\partial\theta_j} \symbf{P} \symbf{y} Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Design matrix of shape ``(n, p)``. theta (torch.Tensor): Flat variance component parameter tensor. require_loglik (bool, optional): Whether to evaluate the REML log-likelihood. Defaults to ``True``. require_beta (bool, optional): Whether to compute the fixed-effect estimate. Defaults to ``True``. Returns: tuple: ``(beta, score, ai, loglik)``, where ``beta`` is of shape ``(p,)``, ``score`` is of shape ``(num_params,)``, ``ai`` is of shape ``(num_params, num_params)``, and ``loglik`` is a scalar tensor. ``beta`` and ``loglik`` are ``torch.nan`` if their respective ``require_*`` flag is ``False``. """ device = get_device(y, x, theta) dtype = get_dtype(y, x, theta) matrix = {} vector = {} scalar = {} tensor3d = {} matrix["X"] = x matrix["Y"] = y.unsqueeze(-1) scalar["N"] = y.shape[0] matrix["V"], tensor3d["dV"] = self.compute_v_dv(theta) scalar["K"] = len(tensor3d["dV"]) matrix["V"] = matrix["V"] + 1e-6 * torch.eye(scalar["N"], device=device, dtype=dtype) # print(theta) # print(matrix["V"]) # eigvals = torch.linalg.eigvalsh(matrix["V"]) # print(eigvals) # print("min eigenvalue:", eigvals.min().item()) matrix["L"] = torch.linalg.cholesky(matrix["V"]) matrix["V^{-1} Y"] = torch.cholesky_solve(matrix["Y"], matrix["L"]) matrix["V^{-1} X"] = torch.cholesky_solve(matrix["X"], matrix["L"]) matrix["X^T V^{-1} X"] = matrix["X"].T @ matrix["V^{-1} X"] matrix["L_{X^T V^{-1} X}"] = torch.linalg.cholesky(matrix["X^T V^{-1} X"]) matrix["(X^T V^{-1} X)^{-1} X^T V{-1}"] = torch.cholesky_solve(matrix["V^{-1} X"].T, matrix["L_{X^T V^{-1} X}"]) matrix["V^{-1} X (X^T V^{-1} X)^{-1} X^T V{-1}"] = matrix["V^{-1} X"] @ matrix["(X^T V^{-1} X)^{-1} X^T V{-1}"] matrix["V^{-1} X (X^T V^{-1} X)^{-1} X^T V{-1} Y"] = matrix["V^{-1} X (X^T V^{-1} X)^{-1} X^T V{-1}"] @ matrix["Y"] matrix["P Y"] = matrix["V^{-1} Y"] - matrix["V^{-1} X (X^T V^{-1} X)^{-1} X^T V{-1} Y"] tensor3d["V^{-1} dV"] = torch.cholesky_solve(tensor3d["dV"], matrix["L"]) tensor3d["V^{-1} X (X^T V^{-1} X)^{-1} X^T V{-1} dV"] = matrix["V^{-1} X (X^T V^{-1} X)^{-1} X^T V{-1}"] @ tensor3d["dV"] tensor3d["P dV"] = tensor3d["V^{-1} dV"] - tensor3d["V^{-1} X (X^T V^{-1} X)^{-1} X^T V{-1} dV"] tensor3d["P dV P Y"] = tensor3d["P dV"] @ matrix["P Y"] vector["Y^T P dV P Y"] = (matrix["Y"].T @ tensor3d["P dV P Y"]).squeeze() vector["tr(P dV)"] = torch.vmap(torch.trace)(tensor3d["P dV"]) # Score vector vector["score"] = 0.5 * (vector["Y^T P dV P Y"] - vector["tr(P dV)"]) # AI matrix tensor3d["Y^T P dV"] = matrix["Y"].T @ tensor3d["P dV"] matrix["AI"] = 0.5 * (tensor3d["Y^T P dV"].squeeze() @ tensor3d["P dV P Y"].squeeze().T) if matrix["AI"].ndim != 2: if matrix["AI"].numel() == 1: matrix["AI"] = matrix["AI"].reshape(1, 1) else: raise RuntimeError("AI matrix is not a 2D tensor!") if matrix["AI"].shape[0] != matrix["AI"].shape[1]: raise RuntimeError("AI matrix is not a square matrix!") # REML log-likelihood if require_loglik: scalar["log |V|"] = 2.0 * torch.sum(torch.log(torch.diag(matrix["L"]))) scalar["log |X^T V^{-1} X|"] = 2.0 * torch.sum(torch.log(torch.diag(matrix["L_{X^T V^{-1} X}"]))) scalar["Y^T P Y"] = (matrix["Y"].T @ matrix["P Y"]).squeeze() scalar["loglik"] = -0.5 * (scalar["log |V|"] + scalar["log |X^T V^{-1} X|"] + scalar["Y^T P Y"]) else: scalar["loglik"] = torch.nan # Beta if require_beta: vector["beta"] = (matrix["(X^T V^{-1} X)^{-1} X^T V{-1}"] @ matrix["Y"]).squeeze() else: vector["beta"] = torch.nan return vector["beta"], vector["score"], matrix["AI"], scalar["loglik"]
[docs] def get_theta(self, select="last", history=None): """ Retrieve a parameter estimate from the optimisation history. Args: select (str, optional): ``"last"`` returns the final iterate; any other value returns the iterate with the highest log-likelihood. Defaults to ``"last"``. history (dict, optional): History dictionary to query. Defaults to :attr:`history` populated by :meth:`optimize`. Returns: torch.Tensor: Selected parameter tensor :math:`\\boldsymbol{\\theta}`. """ if history is None: history = self.history if select == "last": return self.history["theta"][-1] else: if torch.is_tensor(self.history["loglik"][-1]): index = torch.argmax(torch.stack(self.history["loglik"])).item() return self.history["theta"][index] else: return self.history["theta"][-1]
[docs] def get_beta(self, select="last", history=None): """ Retrieve a fixed-effect estimate from the optimisation history. Args: select (str, optional): ``"last"`` returns the final iterate; any other value returns the iterate with the highest log-likelihood. Defaults to ``"last"``. history (dict, optional): History dictionary to query. Defaults to :attr:`history` populated by :meth:`optimize`. Returns: torch.Tensor: Selected fixed-effect estimate :math:`\\hat{\\boldsymbol{\\beta}}`. """ if history is None: history = self.history if select == "last": return self.history["beta"][-1] else: if torch.is_tensor(self.history["loglik"][-1]): index = torch.argmax(torch.stack(self.history["loglik"])).item() return self.history["beta"][index] else: return self.history["beta"][-1]
[docs] def is_converged(self, check_score=True, check_delta=True, check_loglik=True, tol_score=1e-4, tol_delta=1e-4, tol_loglik=1e-4): """ Check whether the optimisation has converged. Convergence is declared when all enabled criteria fall below their respective tolerances. At least two iterations must have completed before any criterion can be satisfied. Args: check_score (bool, optional): Check the norm of the score vector. Defaults to ``True``. check_delta (bool, optional): Check the norm of the parameter update :math:`\\Delta`. Defaults to ``True``. check_loglik (bool, optional): Check the absolute change in log-likelihood between successive iterates. Defaults to ``True``. tol_score (float, optional): Score norm tolerance. Defaults to ``1e-4``. tol_delta (float, optional): Parameter update norm tolerance. Defaults to ``1e-4``. tol_loglik (float, optional): Log-likelihood change tolerance. Defaults to ``1e-4``. Returns: bool: ``True`` if all enabled criteria are satisfied, ``False`` otherwise. """ if len(self.history["score"]) < 2: return False if check_score: score_norm = torch.norm(self.history["score"][-1]).item() if score_norm >= tol_score: return False if check_delta: delta_norm = torch.norm(self.history["delta"][-1]).item() if delta_norm >= tol_delta: return False if check_loglik and torch.is_tensor(self.history["loglik"][-1]): loglik_diff = torch.abs(self.history["loglik"][-1] - self.history["loglik"][-2]).item() if loglik_diff >= tol_loglik: return False return True
[docs] def update(self, theta, delta, eta, lb=-torch.inf, ub=torch.inf): r""" Apply a damped AI step and clip parameters to bounds. .. math:: \boldsymbol{\theta} \leftarrow \mathrm{clip}(\boldsymbol{\theta} + \eta \Delta,\, \text{lb},\, \text{ub}) Args: theta (torch.Tensor): Current parameter tensor. delta (torch.Tensor): AI step :math:`\Delta = \symbf{AI}^{-1}\symbf{s}`. eta (float): Step size (learning rate). lb (float, optional): Lower bound for clipping. Defaults to ``-inf``. ub (float, optional): Upper bound for clipping. Defaults to ``inf``. Returns: tuple: ``(theta, update)``, where ``theta`` is the updated parameter tensor and ``update`` is the actual change after clipping. """ last_theta = theta theta = theta + delta * eta theta = torch.clamp(theta, min=lb, max=ub) return theta, theta - last_theta
[docs] def optimize(self, y, x, theta, max_iter=200, eta=1.0, require_loglik=True, lb=-torch.inf, ub=torch.inf, verbose=0, check_score=True, check_delta=True, check_loglik=True, tol_score=1e-4, tol_delta=1e-4, tol_loglik=1e-4): """ Run the AI-REML optimisation loop. Iterates :meth:`ai_step` and :meth:`update` until convergence or ``max_iter`` is reached. Optimisation history is stored in :attr:`history` and can be queried afterwards via :meth:`get_theta` and :meth:`get_beta`. Args: y (torch.Tensor): Response vector of shape ``(n,)``. x (torch.Tensor): Design matrix of shape ``(n, p)``. theta (torch.Tensor): Initial variance component parameter tensor. max_iter (int, optional): Maximum number of iterations. Defaults to ``200``. eta (float, optional): Step size applied to each AI update. Defaults to ``1.0``. require_loglik (bool, optional): Whether to evaluate the REML log-likelihood at each iteration. Defaults to ``True``. lb (float, optional): Lower bound for parameter clipping. Defaults to ``-inf``. ub (float, optional): Upper bound for parameter clipping. Defaults to ``inf``. verbose (int, optional): Verbosity level. ``0`` suppresses all output, ``1`` shows a progress bar, ``2`` additionally prints per-iteration diagnostics. Defaults to ``0``. check_score (bool, optional): Include score norm in convergence check. Defaults to ``True``. check_delta (bool, optional): Include parameter update norm in convergence check. Defaults to ``True``. check_loglik (bool, optional): Include log-likelihood change in convergence check. Defaults to ``True``. tol_score (float, optional): Score norm tolerance. Defaults to ``1e-4``. tol_delta (float, optional): Parameter update norm tolerance. Defaults to ``1e-4``. tol_loglik (float, optional): Log-likelihood change tolerance. Defaults to ``1e-4``. Returns: tuple: ``(theta, beta, n_iter)``, where ``theta`` is the final parameter estimate, ``beta`` is the corresponding fixed-effect estimate, and ``n_iter`` is the number of iterations completed. """ self.history = {"theta": [], "beta": [], "loglik": [], "score": [], "ai": [], "delta": [], "update": []} pb = tqdm(disable=not verbose, bar_format="{desc} \u23F1 {elapsed} | \u26A1 {rate_fmt}") with torch.no_grad(): for i in range(max_iter): beta, score, ai, loglik = self.ai_step(y, x, theta, require_loglik=require_loglik) delta = torch.linalg.lstsq(ai, score.unsqueeze(-1)).solution.squeeze() theta, update = self.update(theta, delta, eta, lb, ub) self.history["theta"].append(theta) self.history["beta"].append(beta) self.history["loglik"].append(loglik) self.history["score"].append(score) self.history["ai"].append(ai) self.history["delta"].append(delta) self.history["update"].append(update) if verbose > 0: pb.set_description(f"Iter {i + 1}") pb.update(1) if verbose > 1: write_str = f"\u2225\u2207\u2225: {torch.norm(score):12.4f}, \u2225\u0394\u2225: {torch.norm(delta):6.4f}, \u03B7: {eta:.2f}, \u2225\u0394\u1D9C\u2225: {torch.norm(update):6.4f}" if require_loglik: if len(self.history["loglik"]) > 1: delta_loglik = self.history["loglik"][-1].item() - self.history["loglik"][-2].item() write_str += f", log \U0001D4DB: {loglik:8.4f} ({delta_loglik:+.4f})" else: write_str += f", log \U0001D4DB: {loglik:8.4f}" if i == 0: tqdm.write("") tqdm.write(write_str) if self.is_converged(check_score, check_delta, check_loglik, tol_score, tol_delta, tol_loglik): if verbose > 0: if verbose > 1: tqdm.write(f"\n[\u2207: score, \u0394: \U0001D409\u207B\u00B9\u2207, \u03B7: learning rate, \u0394\u1D9C: clip(\U0001D6C9 + \u03B7\u0394, lb, ub) - \U0001D6C9, \U0001D4DB: restricted likelihood]") tqdm.write(f"\n\u2713 Converged at iteration {i + 1}") break pb.close() return theta, beta, i + 1