Source code for lib5c.algorithms.knight_ruiz

"""
Knight-Ruiz algorithm.

Transcribed from the MATLAB source provided in Rao et al. 2014 by Dan Gillis.
"""

import numpy as np

from lib5c.util.parallelization import parallelize_regions
from lib5c.util.counts import flatten_regional_counts, impute_values
from lib5c.util.mathematics import gmean


[docs]def strip_zero_rows_columns_sym_mat(sym_mat): """ Given symmetric 2D numpy array sym_mat, removes rows and columns that have no non-zero entries """ zero_cols = np.where(np.all(sym_mat == 0, axis=1)) reduced_mat = np.delete(sym_mat, zero_cols, axis=1) reduced_mat = np.delete(reduced_mat, zero_cols, axis=0) return reduced_mat
[docs]@parallelize_regions def kr_balance_matrix(matrix, max_iter=3000, retain_scale=True, imputation_size=0): """ Convenience function for applying KR balancing to a counts matrix. Parameters ---------- matrix : np.ndarray The matrix to balance. max_iter : int The maximum number of iterations to try. retain_scale : bool Pass True to rescale the results to the scale of the original matrix using a ratio of geometric means. imputation_size : int Pass an int greater than 0 to replace NaN's in the matrix with a local median approximation. Pass 0 to skip imputation. Returns ------- Tuple[np.ndarray, np.ndarray, np.ndarray] The first array contains the balanced matrix. The second contains the bias vector. The third contains the residual. """ imputed_matrix = impute_values(matrix, size=imputation_size) bias_vector, errs = kr_balance(imputed_matrix, max_iter=max_iter) balanced = balance_matrix(matrix, bias_vector) if retain_scale: # temporary heuristic for scaling the kr results to the original scale factor = gmean(np.array(flatten_regional_counts(matrix))) * len(matrix) balanced *= factor bias_vector *= np.sqrt(factor) balanced[~np.isfinite(matrix)] = np.nan return balanced, bias_vector[:, 0], errs
[docs]@parallelize_regions def balance_matrix(matrix, bias_vector, invert=False): """ Balance a matrix given the appropriate multiplicative bias vector. Parameters ---------- matrix : np.ndarray bias_vector : np.ndarray invert : Optional[bool] Pass True to invert the bias vector before balancing. Returns ------- np.ndarray The balanced matrix. """ if invert: bias_vector = 1.0 / bias_vector return bias_vector.T * matrix * bias_vector
[docs]@parallelize_regions def kr_balance(array, tol=np.float_(1e-6), x0=None, delta=0.1, ddelta=3, fl=0, max_iter=3000): """ Performs Knight-Ruiz matrix balancing algorithm on a 2D symmetric numpy array. Note that this function does not check for symmetry of the array - this function may not converge if given non-symmetric matrix. Note also that this function does not return balanced matrix - it returns the entries of the diagonal matrix that should be multiplied on either side of ``array`` to get balanced matrix. Parameters ---------- array : np.ndarray Matrix to balance. Should be square and symmetric. tol : float Parameter related to tolerance x0 : Optional[np.ndarray] The initial guess to use for the bias vector. If not passed, a vector of all 1's will be used. delta : float Parameter related to learning rate. ddelta : float Parameter related to learning rate. fl : int Adjusts the verbosity of command line output. max_iter : Optional[int] The maximum number of iterations. Pass None to set no limit. Returns ------- Tuple[np.ndarray] The first element is the bias vector, the second is the residual. Examples -------- >>> import numpy as np >>> X = np.reshape(range(16), (4, 4)).astype(float) >>> counts = X + X.T >>> counts array([[ 0., 5., 10., 15.], [ 5., 10., 15., 20.], [10., 15., 20., 25.], [15., 20., 25., 30.]]) >>> counts.sum(axis=1) array([30., 50., 70., 90.]) >>> x, res = kr_balance(counts) >>> balanced = x.T * counts * x >>> balanced array([[0. , 0.26604444, 0.34729636, 0.3866592 ], [0.26604444, 0.2489703 , 0.24375574, 0.24122952], [0.34729636, 0.24375574, 0.21213368, 0.19681423], [0.3866592 , 0.24122952, 0.19681423, 0.17529705]]) >>> balanced.sum(axis=1) array([1., 1., 1., 1.]) >>> for i in range(len(counts)): ... for j in range(i+1): ... if i % 2 == j % 2: ... counts[i, j] = 0.0 ... counts[j, i] = 0.0 >>> counts array([[ 0., 5., 0., 15.], [ 5., 0., 15., 0.], [ 0., 15., 0., 25.], [15., 0., 25., 0.]]) >>> x, res = kr_balance(counts) >>> balanced = x.T * counts * x >>> balanced array([[0. , 0.42705098, 0. , 0.57294902], [0.42705098, 0. , 0.57294902, 0. ], [0. , 0.57294902, 0. , 0.42705098], [0.57294902, 0. , 0.42705098, 0. ]]) >>> balanced.sum(axis=1) array([1., 1., 1., 1.]) """ dims = array.shape if dims[0] != dims[1] or len(dims) > 2: raise Exception n = dims[0] e = np.ones((n, 1)) res = np.array([]) if x0 is None: x0 = e.copy() g = np.float_(0.9) eta_max = 0.1 eta = eta_max stop_tol = tol * 0.5 # x0 is not used again, so do not need to copy object x = x0 # n x 1 array rt = tol ** 2 v = x0 * np.dot(array, x) # n x 1 array rk = 1 - v # n x 1 array rho_km1 = np.dot(np.transpose(rk), rk)[0, 0] # scalar value # no need to copy next two objects - are scalar values rout = rho_km1 rold = rout mvp = 0 i = 0 if fl == 1: print('it in. it res') while rout > rt: if max_iter is not None and i > max_iter: break i += 1 k = 0 y = np.ones((n, 1)) innertol = max(eta ** 2 * rout, rt) while rho_km1 > innertol: k += 1 # print('{} {} {}'.format(i, k, innertol)) # print(rk) if k == 1: z = rk / v p = z.copy() rho_km1 = np.dot(np.transpose(rk), z) else: beta = rho_km1 / rho_km2 p = z + beta * p w = x * np.dot(array, x * p) + v * p alpha = rho_km1 / np.dot(np.transpose(p), w) ap = alpha * p ynew = y + ap if np.min(ynew) <= delta: if delta == 0: break ind = np.where(ap < 0) gamma = np.min((delta - y[ind]) / ap[ind]) y = y + gamma * ap break if np.max(ynew) >= ddelta: # see above - need to code # also check that (i.e. gamma or ap is scalar) ind = np.where(ynew > ddelta) gamma = min((ddelta - y[ind]) / ap[ind]) y = y + gamma * ap break y = ynew rk = rk - alpha * w rho_km2 = rho_km1 z = rk / v rho_km1 = np.dot(np.transpose(rk), z) # print(rho_km1) x = x * y v = x * np.dot(array, x) rk = 1 - v rho_km1 = np.dot(np.transpose(rk), rk)[0, 0] # scalar value rout = rho_km1 mvp = mvp + k + 1 rat = rout / rold rold = rout res_norm = np.sqrt(rout) eta_0 = eta eta = g * rat if g * eta_0 ** 2 > 0.1: eta = max(eta, g * eta_0 ** 2) eta = max(min(eta, eta_max), stop_tol / res_norm) if fl == 1: print('{} {} {:.3e}'.format(i, k, res_norm)) res = np.append(res, res_norm) return x, res