Source code for lib5c.algorithms.express

"""
Module for implementation of the "Express" algorithm from Sauria et al. 2015.
"""

import numpy as np

from lib5c.util.parallelization import parallelize_regions


[docs]@parallelize_regions def express_normalize_matrix(obs_matrix, exp_matrix, max_iter=1000, eps=1e-4): """ Express balance a matrix given a corresponding expected matrix. Parameters ---------- obs_matrix : np.ndarray The matrix to normalize. exp_matrix : np.ndarray The expected matrix corresponding to the obs_matrix. max_iter : int The maximum number of iterations. eps : float When the fractional change in the residual is less than this number, the algorithm is considered to have converged and will stop iterating. Returns ------- Tuple[np.ndarray, np.ndarray, List[float]] The first element of the tuple is the normalized matrix. The second element is the multiplicative bias vector. The third element is a list containing the L1 norm of the residual at every iteration. """ # log transform log_obs = np.log(obs_matrix + 1) log_exp = np.log(exp_matrix + 1) # initialize bias vector bias_factors = np.zeros(len(log_obs)) # counter for iterations i = 0 # variables to track L1 error prev_err = np.inf curr_err = np.inf errs = [] while i < max_iter and not (prev_err - curr_err) / prev_err < eps: # apply bias to expected log_corr_exp = \ (log_exp.T + bias_factors).T + bias_factors # compute residual residual = log_obs - log_corr_exp # update each element of the bias vector for j in range(len(log_obs)): filtered_residual = \ residual[:, j][(np.isfinite(residual[:, j])) & (log_obs[:, j] != 0)] update = np.sum(filtered_residual) / (2 * len(filtered_residual)) if np.isfinite(update): bias_factors[j] += update # increment iteration counter i += 1 # update error prev_err = curr_err curr_err = np.linalg.norm(residual[np.isfinite(residual)], ord=1) errs.append(curr_err) # apply optimized bias to observed log_corr_obs = (log_obs.T - bias_factors).T - bias_factors # undo log transform normalized_matrix = np.exp(log_corr_obs) - 1 bias_factors = np.exp(bias_factors) # force normalized matrix to be non-negative normalized_matrix[normalized_matrix < 0] = 0 return normalized_matrix, bias_factors, errs
[docs]@parallelize_regions def joint_express_normalize(obs_matrices, exp_matrices, max_iter=1000, eps=1e-4): """ Express balance a set of matrices given a set of corresponding expected matrices, using a single shared bias vector. Parameters ---------- obs_matrices : List[np.ndarray] The matrix to normalize. exp_matrices : List[np.ndarray] The expected matrix corresponding to the obs_matrix. max_iter : int The maximum number of iterations. eps : float When the fractional change in the residual is less than this number, the algorithm is considered to have converged and will stop iterating. Returns ------- Tuple[List[np.ndarray], np.ndarray, List[float]] The first element of the tuple is the list of normalized matrices. The second element is the multiplicative bias vector. The third element is a list containing the L1 norm of the residual at every iteration. """ # log transform log_obs_matrices = [np.log(obs_matrix + 1) for obs_matrix in obs_matrices] log_exp_matrices = [np.log(exp_matrix + 1) for exp_matrix in exp_matrices] # initialize bias vector bias_factors = np.zeros(len(log_obs_matrices[0])) # counter for iterations i = 0 # variables to track L1 error prev_err = np.inf curr_err = np.inf errs = [] while i < max_iter and not (prev_err - curr_err) / prev_err < eps: # apply bias to expected log_corr_exp_matrices = [ (log_exp_matrix.T + bias_factors).T + bias_factors for log_exp_matrix in log_exp_matrices] # concatenate log_obs = np.concatenate([log_obs_matrices[k] for k in range(len(obs_matrices))]) log_corr_exp = np.concatenate([log_corr_exp_matrices[k] for k in range(len(obs_matrices))]) # compute residual residual = log_obs - log_corr_exp # update each element of the bias vector for j in range(len(log_obs_matrices[0])): filtered_residual = \ residual[:, j][(np.isfinite(residual[:, j])) & (log_obs[:, j] != 0)] update = np.sum(filtered_residual) / (2 * len(filtered_residual)) if np.isfinite(update): bias_factors[j] += update # increment iteration counter i += 1 # update error prev_err = curr_err curr_err = np.linalg.norm(residual[np.isfinite(residual)], ord=1) errs.append(curr_err) # apply optimized bias to observed log_corr_obs_matrices = [(log_obs_matrix.T - bias_factors).T - bias_factors for log_obs_matrix in log_obs_matrices] # undo log transform normalized_matrices = [np.exp(log_corr_obs_matrix) - 1 for log_corr_obs_matrix in log_corr_obs_matrices] bias_factors = np.exp(bias_factors) # force normalized matrices to be non-negative for normalized_matrix in normalized_matrices: normalized_matrix[normalized_matrix < 0] = 0 return normalized_matrices, bias_factors, errs