Source code for lib5c.algorithms.variance.mle

import numpy as np
import scipy.stats as stats
from scipy.optimize import minimize

from lib5c.util.parallelization import parallelize_regions
from lib5c.util.counts import flatten_and_filter_counts
from lib5c.algorithms.variance.nbinom_dispersion import nb_nll, \
    dispersion_to_variance as nbinom_disp_to_var
from lib5c.algorithms.variance.lognorm_dispersion import \
    dispersion_to_variance as lognorm_disp_to_var


[docs]def mle_variance(obs, exp, model='lognorm', min_obs=2, min_dist=6, regional=False): """ Fits a single point estimate of the dispersion across each or all regions under the selected model, and returns the converted variance estimates. Parameters ---------- obs, exp : dict of np.ndarray The counts dicts of observed and expected data. Keys are region names, values are square, symmetric count matrices. model : {'lognorm', 'loglogistic', 'nbinom'} The statistical model to use for MLE point estimation. min_obs : float Fit only points with at least this many observed counts. min_dist : int Fit only points with at least this interaction distance in bin units. regional : bool Pass True to fit a separate point estimate for each region. Returns ------- dict of np.ndarray The variance estimates. """ if regional: return parallelize_regions(mle_variance)(obs, exp, model=model, min_dist=min_dist) original_exp = exp # save full square symmetric matrices flattened_counts, _, _ = flatten_and_filter_counts( {'obs': obs, 'exp': exp}, min_filters={'obs': min_obs, 'dist': min_dist}) obs = flattened_counts['obs'] exp = flattened_counts['exp'] if model == 'nbinom': obs = np.floor(obs) res = minimize(nb_nll, [0.1], args=(obs, exp), method='Nelder-Mead') if not res.success: print(res.message) raise ValueError('optimization failed') disp = res.x if isinstance(original_exp, dict): return {r: nbinom_disp_to_var(disp, original_exp[r]) for r in original_exp} else: return nbinom_disp_to_var(disp, original_exp) elif model in ['lognorm', 'loglogistic']: dist_gen = getattr(stats, model[3:]) disp = dist_gen(*dist_gen.fit(np.log1p(obs) - np.log1p(exp))).stats('v') if isinstance(original_exp, dict): return {r: lognorm_disp_to_var(disp, original_exp[r]) for r in original_exp} else: return lognorm_disp_to_var(disp, original_exp) elif model == 'norm': dist_gen = getattr(stats, model) var = dist_gen(*dist_gen.fit(obs - exp)).stats('v') if isinstance(original_exp, dict): return {r: np.tile(var, original_exp[r].shape) for r in original_exp} else: return np.tile(var, original_exp.shape) elif model == 'poisson': raise ValueError('use var=exp for poisson variance') else: raise ValueError('unknown model %s' % model)