Source code for lib5c.algorithms.spline_normalization

"""
Module for fitting b-splines to 5C counts data as a method of bias correction.
"""

import numpy as np
from scipy.interpolate import LSQBivariateSpline

from lib5c.util.counts import parallel_subtract_counts, abs_diff_counts,\
    norm_counts, parallel_log_counts, parallel_unlog_counts
from lib5c.util.mathematics import gmean


[docs]class DiscreteBivariateEmpiricalSurface(object): def __init__(self, xs, ys, zs): # coerce to np array xs = np.array(xs) ys = np.array(ys) zs = np.array(zs) # identify unique values self.range_x = np.unique(xs) self.range_y = np.unique(ys) # compute empirical surface surface = {(x, y): np.nanmean(zs[(xs == x) & (ys == y)]) for x in self.range_x for y in self.range_y} # fill nan's in the surface self.surface = dict(surface) for x in self.range_x: for y in self.range_y: if not np.isfinite(self.surface[x, y]): self.surface[x, y] = np.nanmean( [surface[x_prime, y] for x_prime in self.range_x] + [surface[x, y_prime] for y_prime in self.range_y])
[docs] def ev(self, x, y): return self.surface[x, y]
[docs]def fit_spline(counts_list, primermap, bias_factor, knots=10, asymmetric=False): """ Fits a 2-D cubic b spline surface to the counts data as a function of the specified upstream and downstream bias factors. Parameters ---------- counts_list : List[Dict[str, np.ndarray]] The counts data to fit the splines with. primermap : Dict[str, List[Dict[str, Any]]] The primermap describing the loci. The ``bias_factor`` must be a key of the inner dict. bias_factor : str The bias factor to fit the model with. knots : Optional[int] The number of knots to use for the spline. If the bias factor is discrete, pass 0 to use an empirical discrete surface instead of a spline. log : Optional[bool] Pass true to fit the spline to logged data. asymmetric : Optional[bool] Pass True to iterate over only the upper triangular entries of the counts matrices. The default is False, which iterates over the whole counts matrices. Returns ------- Tuple[LSQBivariateSpline, Dict[str, np.ndarray], List[Dict[str, np.ndarray]]] The first element of the tuple is the spline surface fit to the data. The second element contains the values of the spline surface evaluated at each point in the original counts dict. The third element contains the bias-corrected counts dicts. """ # infer regions regions = list(counts_list[0].keys()) # construct arrays x = [] y = [] z = [] for counts in counts_list: for region in regions: for i in range(len(counts[region])): if asymmetric: jrange = range(i + 1) else: jrange = range(len(counts[region])) for j in jrange: if np.isfinite(counts[region][i, j]): x.append(primermap[region][i][bias_factor]) y.append(primermap[region][j][bias_factor]) z.append(counts[region][i, j]) # fit spline or empirical surface if knots: # place knots x_unique = np.unique(x) x_knots = np.percentile(x, np.linspace(0, 100, knots)) \ if len(x_unique) > knots else np.linspace(min(x), max(x), knots) y_unique = np.unique(y) y_knots = np.percentile(y, np.linspace(0, 100, knots)) \ if len(y_unique) > knots else np.linspace(min(y), max(y), knots) # fit spline spline = LSQBivariateSpline(x, y, z, x_knots, y_knots) else: spline = DiscreteBivariateEmpiricalSurface(x, y, z) # evaluate spline spline_values = {region: np.copy(counts_list[0][region]) for region in regions} for region in regions: for i in range(len(spline_values[region])): for j in range(i + 1): spline_value = spline.ev(primermap[region][i][bias_factor], primermap[region][j][bias_factor]) if not np.isfinite(spline_value): raise ValueError('infinite spline value, probably due to ' 'too many knots') spline_values[region][i, j] = spline_value spline_values[region][j, i] = spline_value # correct original matrix corrected = [parallel_subtract_counts(counts, spline_values) for counts in counts_list] return spline, spline_values, corrected
[docs]def iterative_spline_normalization(counts_list, exp_list, primermap, bias_list, max_iter=100, eps=1e-4, knots=10, log=True, asymmetric=False): """ Convenience function for iteratively applying a set of spline normalization steps to a set of counts dicts. Parameters ---------- counts_list : List[Dict[str, np.ndarray]] A list of observed counts dicts to normalize. exp_list : List[Dict[str, np.ndarray]] A list of expected counts dicts corresponding to the counts dicts in ``counts_list``. primermap : Dict[str, List[Dict[str, Any]]] Primermap or pixelmap describing the loci in this region. bias_list : List[str] A list of bias factors to remove from the counts. These strings must match metadata keys in ``primermap``. That is to say, if ``bias_list`` is ``['length']`` then we expect ``primermap[region][i]['length']`` to be a number representing the length of the ``i`` th fragment in the region specified by ``region``. If multiple bias factors are specified, the algorithm will iteratively remove all of them from the data. max_iter : int The maximum number of iterations when iterating between bias factors. eps : float When the relative change in all models drops below this value convergence is declared. knots : Union[int, List[int]] Specifies the number of knots to put into the splines. Pass a single int to use the same number of knots in each model. Pass a list of ints of length equal to the length of ``bias_list`` to use ``knots[i]`` knots for the bias factor named ``bias_list[i]``. If a bias factor is discrete, pass 0 for its knot number to use an empirical discrete surface instead of a spline. log : bool Pass True to fit the splines to log-scale data, reducing the effects of outliers. asymmetric : bool Pass True to construct models using only the upper-triangular elements of the counts matrices, which can lead to asymmetric models. By default, the algorithm iterates over all elements of the counts matrices, enforcing symmetry in the bias models but incurring some redundancy in the actual counts information. Returns ------- Tuple[Dict[str, scipy.interpolate.BivariateSpline], List[Dict[str, np.ndarray]], List[Dict[str, np.ndarray]]] The first element of the tuple is a dict mapping the bias factors specified in ``bias_list`` to BivariateSpline instances. The second element in the tuple is a dict mapping the bias factors specified in ``bias_list`` to counts dicts containing the evaluations of the spline fit to that bias factor at each point in the list of input counts dicts. The third element of the tuple is the normalized list of counts. """ # deduce regions regions = list(counts_list[0].keys()) # the change in each model the last time it was updated deltas = np.ones(len(bias_list)) # the latest version of each model, intially they are all ones models = {bias: {region: np.zeros_like(counts_list[0][region], dtype=float) for region in regions} for bias in bias_list} # spline representations of the models, initially they are None splines = {} # experimental: if one of the biases is 'gmean', add the geometric row means # afted distance dependence subtraction as a bias factor if 'gmean' in bias_list: partially_corrected_counts_list = [ parallel_subtract_counts(counts_list[j], exp_list[j]) for j in range(len(counts_list))] for region in regions: for i in range(len(counts_list[0][region])): primermap[region][i]['gmean'] = gmean(np.concatenate( [partially_corrected_counts_list[j][region][i, :] for j in range(len(counts_list))])) # log if appropriate if log: counts_list = [parallel_log_counts(counts) for counts in counts_list] exp_list = [parallel_log_counts(counts) for counts in exp_list] # the current iteration number i = 0 while i < max_iter and np.any(deltas > eps): # select a model to update selected_model = i % len(bias_list) print('iteration %i, updating %s' % (i, bias_list[selected_model])) # correct the counts for all models except this one partially_corrected_counts_list = [ parallel_subtract_counts(*[counts_list[j], exp_list[j]] + [models[bias_list[k]] for k in range(len(bias_list)) if k != selected_model]) for j in range(len(counts_list))] # resolve knots if type(knots) == dict: resolved_knots = knots[bias_list[selected_model]] elif type(knots) == list: resolved_knots = knots[selected_model] else: resolved_knots = knots # update selected model based on this data try: new_spline, new_model, _ = fit_spline( partially_corrected_counts_list, primermap, bias_list[selected_model], knots=resolved_knots, asymmetric=asymmetric) except ValueError: if i >= len(bias_list) - 1: print('early termination due to infinite spline values') print('all bias factors went through at least one iteration') break else: raise ValueError('infinite spline value before all biases were ' 'considered, probably due to too many knots') # update deltas adf = abs_diff_counts(new_model, models[bias_list[selected_model]]) deltas[selected_model] = norm_counts(adf) / \ norm_counts(models[bias_list[selected_model]]) print('delta: %s' % deltas[selected_model]) # update model models[bias_list[selected_model]] = new_model # save spline splines[bias_list[selected_model]] = new_spline # increment iteration number i += 1 corrected_counts_list = [ parallel_subtract_counts(*[counts_list[j]] + [models[bias_list[k]] for k in range(len(bias_list))]) for j in range(len(counts_list))] # unlog if appropriate if log: corrected_counts_list = [parallel_unlog_counts(counts) for counts in corrected_counts_list] return splines, models, corrected_counts_list