Source code for lib5c.algorithms.qnorm

"""
Module for quantile normalization.

Original author of :func:`qnorm()`, :func:`_rank_data()`,
:func:`_average_rows()`, and :func:`_sub_in_normed_val()`: Dan Gillis

Note: data matrices in these functions are typically expected to be arranged
with each column representing one replicate, except for the functions
:func:`_rank_data()`, :func:`_average_rows()`, and :func:`_sub_in_normed_val()`,
which expect them to be arranged with each row representing one replicate.

The exposed functions are :func:`qnorm`, :func:`qnorm_parallel`,
:func:`qnorm_fast`, :func:`qnorm_fast_parallel`, and the convenience function
:func:`qnorm_counts_superdict`.
"""

import numpy as np

from lib5c.util.sorting import rankdata_plus
from lib5c.util.parallelization import parallelize_regions
from lib5c.util.counts_superdict import counts_superdict_to_matrix
from lib5c.util.counts import unflatten_counts_from_list,\
    flatten_regional_counts, unflatten_regional_counts


def _rank_data(data):
    """
    Sorts data, finds rank, and finds all ties in each column.
    Accepted inputs are dicts and lists of lists - data should be n x m
    structure.
    Returns ranked data, list of ranks, and start and end point of all ties.

    Parameters
    ----------
    data : 2d numpy array
        The data to rank.

    Returns
    -------
    tuple of (list of 1d array, 2d array, list of list of numeric)
        The first element of the tuple is a column-sorted representation of
        ``data``. The second element of the tuple contains the column ranks of
        the entries of ``data``. The third element of the tuple contains the
        indices that separate runs of ranks that have the same value.
    """
    sorted_data = []
    tied_rank_idx = []
    ranks = np.zeros((len(data), len(data[0])))

    col_num = -1
    # sort data and find ties in each column
    for column in data:
        col_num += 1
        sort_idx = np.argsort(column)
        # get indices to revert sorted back to unsorted order
        rev_sort_idx = np.zeros(len(column), dtype='int32')
        rev_sort_idx[sort_idx] = np.arange(len(column), dtype='int32')
        sorted_col = np.asarray(column)
        sorted_col = sorted_col[sort_idx]

        sorted_data.append(list(sorted_col))

        # find ties by seeing how many consecutive sorted entries have same
        # value
        # get start site and end site, where end site is first entry that does
        # not have the same value as previous entry (or entries)
        # note: even entries with unique values are considered ties for these
        # purposes (i.e. a tie between one object)
        ind_start = 0  # start of current value
        ind_value = sorted_col[0]  # current value
        tied_rank_idx.append([])
        final_idx = len(column)-1
        for i in range(1, final_idx):
            if sorted_col[i] != ind_value:
                # assign all tied entries the same value
                ranks[col_num][ind_start:i] = ind_start
                # add start site of current tie
                tied_rank_idx[col_num].append(ind_start)
                ind_value = sorted_col[i]
                ind_start = i
        # handle final case separately
        if sorted_col[final_idx] != ind_value:
            ranks[col_num][ind_start:final_idx] = ind_start
            ranks[col_num][final_idx] = final_idx
            tied_rank_idx[col_num].append(ind_start)
            tied_rank_idx[col_num].append(final_idx)  # final value start site
            tied_rank_idx[col_num].append(final_idx+1)  # final value end site
        else:
            ranks[col_num][ind_start:final_idx+1] = ind_start
            tied_rank_idx[col_num].append(ind_start)
            tied_rank_idx[col_num].append(final_idx+1)

        # gives order of ranks for original column
        ranks[col_num] = ranks[col_num][rev_sort_idx]

    return sorted_data, ranks, tied_rank_idx


def _average_rows(data, reference_index=None):
    """
    Find average value for each row of sorted data.

    Parameters
    ----------
    data : list of 1d numpy array
        The data to average.
    reference_index : Optional[int]
        Pass an index into the outer list of ``data`` to serve as the reference
        distribution instead of averaging.

    Returns
    -------
    1d array
        The average of each row of data.
    """
    data_mat = np.zeros((len(data[0]), len(data)))
    # put data into numpy 2-D array
    for col_num in range(0, len(data)):
        data_mat[:, col_num] = data[col_num]
    # find average of rows
    if reference_index is None:
        average_col = np.mean(data_mat, 1)
    else:
        average_col = data_mat[:, reference_index]
    return average_col


def _sub_in_normed_val(data, ranks, average_col, tied_rank_idx, tie):
    """
    Returns data matrix with normalized values. Handles ties according to the
    value passed in the ``tie`` kwarg.

    Parameters
    ----------
    data : list of list of numeric
        The column-sorted data to sub in values for.
    ranks : 2d array
        The original ranks of the values in ``data``.
    average_col : 1d array
        The average of each row of data.
    tied_rank_idx : list of list of numeric
        The indices that separate runs of ranks that have the same values.
    tie : {'lowest', 'average'}, optional
        Pass ``'lowest'`` to set all tied entries to the value of the lowest
        rank. Pass ``'average'`` to set all tied entries to the average value
        across the tied ranks.

    Returns
    -------
    list of 1d array
        The normalized data.
    """
    sorted_norm_data = np.zeros((len(data), len(data[0])))
    normed_data = []
    for col_num in range(0, len(data)):
        # find all values of shared rank
        for rank_num in range(len(tied_rank_idx[col_num])-1):
            start_idx = tied_rank_idx[col_num][rank_num]
            end_idx = tied_rank_idx[col_num][rank_num + 1]
            if tie == 'lowest':
                # give all of these entries the value of lowest rank
                sorted_norm_data[col_num][start_idx:end_idx] =\
                    average_col[start_idx]
            else:  # tie type is average
                # give all of these entries average value of their ranks
                average_val = np.mean(average_col[start_idx:end_idx])
                sorted_norm_data[col_num][start_idx:end_idx] = average_val
        # sort data back to original order
        rev_sort_ord = np.array(ranks[col_num], dtype='int32')
        normed_data.append(sorted_norm_data[col_num][rev_sort_ord])

    return normed_data


[docs]def qnorm(data, tie='lowest', reference_index=None): """ Quantile normalizes a data set. Parallelizable if ``data`` is a 2d np.ndarray; see ``lib5c.algorithms.qnorm.qnorm_parallel()``. Parameters ---------- data : 2d numeric structure, or dict of 1d numeric structure Anything that can be cast to array. Should be passed as row-major. Quantile normalization will performed on the columns of ``data``. tie : {'lowest', 'average'}, optional Pass ``'lowest'`` to set all tied entries to the value of the lowest rank. Pass ``'average'`` to set all tied entries to the average value across the tied ranks. reference_index : int or str, optional If ``data`` is a row-major array, pass a column index to serve as a reference distribution. If ``data`` is a dict, pass a key of that dict that should serve as the reference distribution. Pass None to use the average of all distributions as the target distribution. Returns ------- 2d numpy array, or dict of 1d numpy array The quantile normalized data. If ``data`` was passed as a dict, then a dict with the same keys is returned. Notes ----- This function is nan-safe. As long as each column of the input data contains the same number of nan's, nan's will only get averaged with other nan's, and they will get substituted back into their original positions. See the Examples section for an example of this. Examples -------- >>> import numpy as np >>> from lib5c.algorithms.qnorm import qnorm >>> qnorm(np.array([[5, 4, 3], ... [2, 1, 4], ... [3, 4, 6], ... [4, 2, 8]])) ... array([[5.66666667, 4.66666667, 2. ], [2. , 2. , 3. ], [3. , 4.66666667, 4.66666667], [4.66666667, 3. , 5.66666667]]) >>> qnorm(np.array([[ 5, np.nan, 3], ... [ 2, 1, 4], ... [np.nan, 4, 6], ... [ 4, 2, np.nan]])) ... array([[5. , nan, 2. ], [2. , 2. , 3.33333333], [ nan, 5. , 5. ], [3.33333333, 3.33333333, nan]]) >>> qnorm(np.array([[ 5, np.nan, 3], ... [ 2, 1, 4], ... [np.nan, 4, 6], ... [ 4, 2, np.nan]]), reference_index=1) ... array([[ 4., nan, 1.], [ 1., 1., 2.], [nan, 4., 4.], [ 2., 2., nan]]) >>> res = qnorm({'A': [5, 2, 3, 4], ... 'B': [4, 1, 4, 2], ... 'C': [3, 4, 6, 8]}) >>> list(sorted(res.items())) [('A', array([5.66666667, 2. , 3. , 4.66666667])), ('B', array([4.66666667, 2. , 4.66666667, 3. ])), ('C', array([2. , 3. , 4.66666667, 5.66666667]))] >>> res = qnorm({'A': [5, 2, 3, 4], ... 'B': [4, 1, 4, 2], ... 'C': [3, 4, 6, 8]}, reference_index='C') >>> list(sorted(res.items())) [('A', array([8., 3., 4., 6.])), ('B', array([6., 3., 6., 4.])), ('C', array([3., 4., 6., 8.]))] >>> res = qnorm({'A': [5, 2, 3, 4], ... 'B': [4, 1, 4, 2], ... 'C': [3, 4, 6, 8]}, reference_index='C', tie='average') >>> list(sorted(res.items())) [('A', array([8., 3., 4., 6.])), ('B', array([7., 3., 7., 4.])), ('C', array([3., 4., 6., 8.]))] """ # handle input types if type(data) == dict: # convert dict to array, storing replicate names for later replicate_order = list(data.keys()) data_array = np.array([data[rep] for rep in replicate_order]) if reference_index is not None: reference_index = replicate_order.index(reference_index) else: # make sure we have a numpy array otherwise data_array = np.array(data, dtype=float).T # quantile normlize. sorted_data, ranks, tied_rank_idx = _rank_data(data_array) average_col = _average_rows(sorted_data, reference_index=reference_index) normed_data = _sub_in_normed_val(sorted_data, ranks, average_col, tied_rank_idx, tie) # handle output types if type(data) == dict: # if data was a dict then return a dict normed_data = np.array(normed_data, dtype=float).T normed_data = {replicate_order[i]: normed_data[:, i] for i in range(len(replicate_order))} else: # coerce the output to a numpy array normed_data = np.array(normed_data, dtype=float).T return normed_data
qnorm_parallel = parallelize_regions(qnorm)
[docs]def qnorm_fast(data, reference_index=None): """ Quantile normalizes a data set. Simpler, faster implementation compared to ``lib5c.algorithms.qnorm()``, but only supports ``tie='lowest'`` behavior and only takes an ``np.ndarray`` as input. This approach was developed and timed in `this repsitory <https://bitbucket.org/creminslab/qnorm-sandbox/>`_. Parallelizable if ``data`` is a 2d np.ndarray; see ``lib5c.algorithms.qnorm.qnorm_fast_parallel()``. Parameters ---------- data : np.ndarray Two dimensional, with the columns representing the replicates to be qnormed. Quantile normalization will performed on the columns of ``data``. reference_index : int or str, optional Pass a column index to serve as a reference distribution. Pass None to use the average of all distributions as the target distribution. Returns ------- np.ndarray The quantile normalized data. Notes ----- This function is nan-safe. As long as each column of the input data contains the same number of nan's, nan's will only get averaged with other nan's, and they will get substituted back into their original positions. See the Examples section for an example of this. Examples -------- >>> import numpy as np >>> from lib5c.algorithms.qnorm import qnorm_fast >>> qnorm_fast(np.array([[5, 4, 3], ... [2, 1, 4], ... [3, 4, 6], ... [4, 2, 8]])) ... array([[5.66666667, 4.66666667, 2. ], [2. , 2. , 3. ], [3. , 4.66666667, 4.66666667], [4.66666667, 3. , 5.66666667]]) >>> qnorm_fast(np.array([[ 5, np.nan, 3], ... [ 2, 1, 4], ... [np.nan, 4, 6], ... [ 4, 2, np.nan]])) ... array([[5. , nan, 2. ], [2. , 2. , 3.33333333], [ nan, 5. , 5. ], [3.33333333, 3.33333333, nan]]) >>> qnorm_fast(np.array([[ 5, np.nan, 3], ... [ 2, 1, 4], ... [np.nan, 4, 6], ... [ 4, 2, np.nan]]), reference_index=1) ... array([[ 4., nan, 1.], [ 1., 1., 2.], [nan, 4., 4.], [ 2., 2., nan]]) """ ranks, sorters = zip(*[rankdata_plus(data[:, i], method='min') for i in range(data.shape[1])]) workspace = np.array([data[:, i][sorters[i]] for i in range(data.shape[1])], dtype=float).T if reference_index is not None: averages = np.copy(workspace[:, reference_index]) else: averages = np.mean(workspace, axis=1) for i in range(data.shape[1]): workspace[:, i] = averages[ranks[i] - 1] return workspace
qnorm_fast_parallel = parallelize_regions(qnorm_fast)
[docs]def qnorm_counts_superdict(counts_superdict, primermap, tie='lowest', regional=False, condition_on=None, reference=None): """ Convenience function for quantile normalizing a counts superdict data structure. Parameters ---------- counts_superdict : Dict[Dict[np.ndarray]] The keys of the outer dict are replicate names, the keys of the inner dict are region names, the values are square symmetric arrays of counts for the specified replicate and region. primermap : Dict[str, List[Dict[str, Any]]] The primermap describing the loci whose interaction counts are described in the ``counts_superdict``. tie : {'lowest', 'average'} Pass ``'lowest'`` to set all tied entries to the value of the lowest rank. Pass ``'average'`` to set all tied entries to the average value across the tied ranks. regional : bool Pass True to quantile normalize regions separately. Pass False to quantile normalize all regions together. condition_on : Optional[str] Pass a string key into the inner dicts of ``primermap`` to condition on that quantity. Current limitations: only works with ``regional=True`` and can only condition with exact equality (does not support conditioning on strata of a quantity). Pass None to not do conditional quantile normalization. reference : Optional[str] Pass a string key into the ``counts_superdict`` to indicate a replicate that should be used as a reference distribution to quantile normalize to. Returns ------- Dict[Dict[np.ndarray]] The keys of the outer dict are replicate names, the keys of the inner dict are region names, the values are square symmetric arrays of the quantile normalized counts for the specified replicate and region. """ # determine rep order and region order rep_order = list(counts_superdict.keys()) region_order = list(counts_superdict[rep_order[0]].keys()) ref_index = rep_order.index(reference) if reference is not None else None if regional: if condition_on is None: # construct matrices for each region matrices = { region: np.array( [flatten_regional_counts(counts_superdict[rep][region]) for rep in rep_order]).T for region in region_order} # propagate nan's for region in region_order: matrices[region][(~np.isfinite(matrices[region])).any(axis=1)] \ = np.nan # qnorm matrices if tie == 'lowest': qnormed_matrices = qnorm_fast_parallel( matrices, reference_index=ref_index) else: qnormed_matrices = qnorm_parallel( matrices, tie=tie, reference_index=ref_index) # unpack qnormed data qnormed_counts_superdict = { rep_order[i]: { region: unflatten_regional_counts( qnormed_matrices[region][:, i]) for region in region_order} for i in range(len(rep_order))} else: # collect locus property information properties = {region: np.array( [primermap[region][i][condition_on] for i in range(len(primermap[region]))]) for region in primermap} unique_values = np.unique(np.concatenate( [properties[region] for region in region_order])) # enforce non-inplace operation qnormed_counts_superdict = { rep: {region: np.copy(counts_superdict[rep][region]) for region in region_order} for rep in rep_order} # overwrite with conditionally qnormed values for i in range(len(unique_values)): for j in range(i + 1): for region in region_order: sel = np.outer( properties[region] == unique_values[i], properties[region] == unique_values[j] ) sel = sel | sel.T if np.sum(sel): matrix = np.array( [qnormed_counts_superdict[rep][region][sel] for rep in rep_order]).T matrix[(~np.isfinite(matrix)).any(axis=1)] = np.nan if tie == 'lowest': qnormed_matrix = qnorm_fast( matrix, reference_index=ref_index) else: qnormed_matrix = qnorm( matrix, tie=tie, reference_index=ref_index) for k in range(len(rep_order)): qnormed_counts_superdict[rep_order[k]][region][ sel] = qnormed_matrix[:, k] else: # construct matrix matrix = counts_superdict_to_matrix( counts_superdict, rep_order=rep_order, region_order=region_order).T # propagate nan's matrix[(~np.isfinite(matrix)).any(axis=1)] = np.nan # qnorm matrix if tie == 'lowest': qnormed_matrix = qnorm_fast(matrix, reference_index=ref_index) else: qnormed_matrix = qnorm(matrix, tie=tie, reference_index=ref_index) # unpack qnormed data qnormed_counts_superdict = { rep_order[i]: unflatten_counts_from_list(qnormed_matrix[:, i], region_order, primermap) for i in range(len(rep_order))} return qnormed_counts_superdict