Source code for lib5c.parsers.counts

"""
Module for parsing .counts files.
"""

from math import sqrt

import numpy as np

from lib5c.parsers.primer_names import default_primer_parser, default_bin_parser
from lib5c.util.primers import determine_region_order, aggregate_primermap
from lib5c.parsers.util import parse_field


[docs]def set_nans(counts, primermap): """ Sets nan's in counts dict for ligations considered impossible according to a primermap with strand information. Parameters ---------- counts : Dict[str, np.ndarray] The keys are the region names. The values are the arrays of counts values for that region. These arrays are square and symmetric. primermap : Dict[str, List[Dict[str, Any]]] The keys of the outer dict are region names. The values are lists, where the :math:`i` th entry represents the :math:`i` th primer in that region. Primers are represented as dicts with the following structure:: { 'chrom' : str, 'start' : int, 'end' : int, 'strand' : '+' or '-' } See ``lib5c.parsers.primers.get_primermap()``. Notes ----- If the primermap passed has no strand information, this function will do nothing. This function operates in-place. """ for region in counts.keys(): for i in range(len(primermap[region])): for j in range(i+1): if ('strand' in list(primermap[region][i].keys()) and 'strand' in list(primermap[region][j].keys())): if primermap[region][i]['strand'] == \ primermap[region][j]['strand']: counts[region][i, j] = np.nan counts[region][j, i] = np.nan
[docs]def set_cis_trans_nans(counts, aggregated_primermap): """ Sets nan's in a complete cis and trans counts matrix for ligations considered impossible according to a primermap with strand information. Parameters ---------- counts : np.ndarray Square, symmetric array storing the complete cis and trans counts, with the regions arranged as implied by the ``aggregated_primermap`` aggregated_primermap : List[Dict[str, Any]] The dicts in the lists represent primers, equal in number and order to the side length of the counts matrix, and have the following structure:: { 'chrom' : str, 'start' : int, 'end' : int, 'strand' : '+' or '-' } See ``lib5c.parsers.primers.get_primermap()`` and ``lib5c.util.primers.aggregate_primermap()``. Notes ----- If the aggregated primermap passed has no strand information, this function will do nothing. This function operates in-place. """ for i in range(len(aggregated_primermap)): for j in range(i+1): if ('strand' in list(aggregated_primermap[i].keys()) and 'strand' in list(aggregated_primermap[j].keys())): if aggregated_primermap[i]['strand'] == \ aggregated_primermap[j]['strand']: counts[i, j] = np.nan counts[j, i] = np.nan
[docs]def load_cis_trans_counts(countsfile, primermap, name_parser=default_primer_parser, force_nan='always', region_order=None): """ Loads the counts values from a primer-primer pair .counts file into a single square, symmetric array, and returns it. Parameters ---------- countsfile : str String reference to location of .counts file to load counts from. primermap : Dict[str, List[Dict[str, Any]]] The keys of the outer dict are region names. The values are lists, where the :math:`i` th entry represents the :math:`i` th primer in that region. Primers are represented as dicts with the following structure:: { 'chrom' : str, 'start' : int, 'end' : int } See ``lib5c.parsers.primers.get_primermap()``. name_parser : Optional[Callable[[str], Dict[str, Any]]] Function that takes in the primer names in the countsfile and returns a dict containing key-value pairs containing information required to identify the primer. At a minimum, this dict must have the following structure:: { 'region': str } This information is necessary to deduce what region a given primer in the countsfile belongs to. force_nan : Optional[str] If 'always' is passed and if the primermap contains strand information, impossible ligations will be always set to nan. If 'implicit' is passed, impossible ligations will be set to nan when implied by the strand information in the primermap, but not when the ligations are explicitly present in the countsfile. If 'never' is passed, strand information will be ignored and impossible ligations will not be identified. region_order : Optional[List[str]] If passed, this list will be used to determine the order in which the regions will be concatenated in. If not passed, the regions will be concatenated in order of genomic coordinate. Returns ------- np.ndarray The square, symmetric array of counts. """ # determine region order if not specified if region_order is None: region_order = determine_region_order(primermap) # aggregate primermap aggregated_primermap = aggregate_primermap(primermap, region_order) # create reverse lookup table for aggregated primermap reverse_aggregated_primermap = {aggregated_primermap[i]['name']: i for i in range(len(aggregated_primermap))} # initialize counts array counts = np.zeros([len(aggregated_primermap), len(aggregated_primermap)]) # set nan's for 'implicit' mode if force_nan == 'implicit': set_cis_trans_nans(counts, aggregated_primermap) # parse countsfile with open(countsfile, 'r') as handle: for line in handle: # skip comments if line.startswith('#'): continue # parse line information name1 = line.strip().split('\t')[0] name2 = line.strip().split('\t')[1] value = float(line.strip().split('\t')[2]) region1 = name_parser(name1)['region'] region2 = name_parser(name1)['region'] # skip unrecognized regions if (region1 not in list(primermap.keys()) or region2 not in list(primermap.keys())): continue # identify indices within aggregated primermap index1 = None if name1 in reverse_aggregated_primermap: index1 = reverse_aggregated_primermap[name1] index2 = None if name2 in reverse_aggregated_primermap: index2 = reverse_aggregated_primermap[name2] # skip unrecognized primers if index1 is None or index2 is None: continue # record value counts[index1, index2] = value counts[index2, index1] = value # set nan's for 'always' mode if force_nan == 'always': set_cis_trans_nans(counts, aggregated_primermap) return counts
[docs]def load_counts(countsfile, primermap, force_nan='always', dtype=float): """ Loads the counts values from a primer-primer pair .counts file into square, symmetric arrays and returns them. Parameters ---------- countsfile : str String reference to location of .counts file to load counts from. primermap : Dict[str, List[Dict[str, Any]]] The keys of the outer dict are region names. The values are lists, where the :math:`i` th entry represents the :math:`i` th primer in that region. Primers are represented as dicts with the following structure:: { 'chrom' : str, 'start' : int, 'end' : int } See ``lib5c.parsers.primers.load_primermap()``. force_nan : Optional[str] If 'always' is passed and if the primermap contains strand information, impossible ligations will be always set to nan. If 'implicit' is passed, impossible ligations will be set to nan when implied by the strand information in the primermap, but not when the ligations are explicitly present in the countsfile. If 'never' is passed, strand information will be ignored and impossible ligations will not be identified. dtype : {int, float} Sets the dtype for the matrix. If the value column contains strings this will be ignored and the dtype will be set to 'U25'. Returns ------- Dict[str, np.ndarray] The keys are the region names. The values are the arrays of counts values for that region. These arrays are square and symmetric. """ try: counts = dict(np.load(countsfile)) try: for region in counts: np.testing.assert_allclose(counts[region], counts[region].T, equal_nan=True) except AssertionError: raise AssertionError('input .npz contains non-symmetric matrices') except (IOError, ValueError): # half-hearted dtype inference for strings value_cast_fn = dtype with open(countsfile, 'r') as handle: for line in handle: if line.startswith('#'): continue field_value = parse_field(line.split('\t')[2]) if field_value == np.nan: continue if type(field_value) == str: def value_cast_fn(x): return str(x).strip() dtype = 'U25' break # initialize counts arrays counts = {} for region in primermap.keys(): counts[region] = np.zeros([len(primermap[region]), len(primermap[region])], dtype=dtype) # set nan's for 'implicit' mode if force_nan == 'implicit': set_nans(counts, primermap) # create reverse lookup table reverse_map = {primermap[region][index]['name']: (region, index) for region in primermap.keys() for index in range(len(primermap[region]))} # parse countsfile with open(countsfile, 'r') as handle: for line in handle: # skip comments if line.startswith('#'): continue # parse line information name1 = line.strip().split('\t')[0] name2 = line.strip().split('\t')[1] value = value_cast_fn(line.strip('\n ').split('\t')[2]) # skip unrecognized primers if name1 not in reverse_map or name2 not in reverse_map: continue # identify indices within region region1, index1 = reverse_map[name1] region2, index2 = reverse_map[name2] # skip trans interactions if not region1 == region2: continue # record value counts[region1][index1, index2] = value counts[region1][index2, index1] = value # set nan's for 'always' mode if force_nan == 'always': set_nans(counts, primermap) return counts
[docs]def load_counts_legacy(countsfile, name_parser=default_bin_parser, pixelmap=None): """ Loads the counts values from a binned .counts file into square, symmetric arrays and returns them. Parameters ---------- countsfile : str String reference to location of .counts file to load counts from. name_parser : Optional[Callable[[str], Dict[str, Any]]] Function that takes in the bin name column of the countsfile and returns a dict containing key-value pairs containing information required to identify the bin. At a minimum, this dict must have the following structure:: { 'region': str, 'index': int } This information is necessary to deduce what region a given bin in the countsfile belongs to. The index key is optional, but recommended. If present, its value should be the zero-based index of the bin within the region. If not present, the pixelmap will be searched to identify the bin index. pixelmap : Optional[Dict[str, List[Dict[str, Any]]]] The keys of the outer dict are region names. The values are lists, where the :math:`i` th entry represents the :math:`i` th bin in that region. Bins are represented as dicts with the following structure:: { 'chrom': str, 'start': int, 'end' : int, 'name' : str } See ``lib5c.parsers.get_pixelmap()``. The pixelmap is used to identify the index of a bin within a region. If ``name_parser`` returns an index key, you can pass ``None`` here since the index will be determined from the bin name. Returns ------- Dict[str, np.ndarray] The keys are the region names. The values are the arrays of counts values for that region. These arrays are square and symmetric. Notes ----- This function casts the counts values in the countsfile to floats, so it will work even if the countsfile actually contains pseudocounts or other non-integer values. """ # dict to store the unshaped count information unshaped_counts = {} # parse countsfile with open(countsfile, 'r') as handle: for line in handle: # split line on tab pieces = line.strip().split('\t') # parse bin names name1_fields = name_parser(pieces[0]) name2_fields = name_parser(pieces[1]) # deduce region region = name1_fields['region'] # deduce bin1 index bin1 = None # easy way: use index returned by name_parser if 'index' in name1_fields.keys(): bin1 = name1_fields['index'] # hard way: search pixelmap for bin with this name else: for i in range(len(pixelmap[region])): if pixelmap[region][i]['name'] == pieces[0]: bin1 = i # deduce bin2 index bin2 = None # easy way: use index returned by name_parser if 'index' in name2_fields.keys(): bin2 = name2_fields['index'] # hard way: search pixelmap for bin with this name else: for i in range(len(pixelmap[region])): if pixelmap[region][i]['name'] == pieces[1]: bin2 = i # parse value value = float(pieces[2]) # add region to dict if this is a new one if region not in unshaped_counts: unshaped_counts[region] = [] # add this line to the list unshaped_counts[region].append({'bin1' : bin1, 'bin2' : bin2, 'value': value}) # dict to store 2D arrays counts = {} for region in unshaped_counts: # determine the size of the 2D array we should create for this region # this should be the inverse function of (n**2 - n)/2 + n size = int(0.5 * (-1 + sqrt(8*len(unshaped_counts[region]) + 1))) # initialize this new 2D array counts[region] = np.zeros([size, size]) # fill in this array for unshaped_count in unshaped_counts[region]: counts[region][unshaped_count['bin1'], unshaped_count['bin2']] = \ unshaped_count['value'] counts[region][unshaped_count['bin2'], unshaped_count['bin1']] = \ unshaped_count['value'] return counts
[docs]def load_counts_by_name(countsfile, name_list=None, primermap=None, locusmap=None, force_nan='always', region_order=None): """ Loads the counts values from any .counts file into a single square, symmetric array, and returns it. Parameters ---------- countsfile : str String reference to location of .counts file to load counts from. name_list : Optional[List[str]] Ordered list of locus names as strings. primermap : Optional[Dict[str, List[Dict[str, Any]]]] The keys of the outer dict are region names. The values are lists, where the ith entry represents the ith primer in that region. Primers are represented as dicts with the following structure:: { 'chrom' : str, 'start' : int, 'end' : int } See lib5c.parsers.primers.get_primermap(). locusmap : Optional[LocusMap] Locus information as a LocusMap object. force_nan : Optional[str] If 'always' is passed and if the primermap contains strand information, impossible ligations will be always set to nan. If 'implicit' is passed, impossible ligations will be set to nan when implied by the strand information in the primermap, but not when the ligations are explicitly present in the countsfile. If 'never' is passed, strand information will be ignored and impossible ligations will not be identified. region_order : Optional[List[str]] If passed, this list will be used to determine the order in which the regions will be concatenated in. If not passed, the regions will be concatenated in order of genomic coordinate. If name_list is passed, this kwarg is ignored. Returns ------- np.ndarray The square, symmetric array of counts. """ # determine region order if not specified if not name_list and region_order is None: if primermap: region_order = determine_region_order(primermap) elif locusmap: region_order = locusmap.get_regions() # aggregate primermap aggregated_primermap = None if primermap: aggregated_primermap = aggregate_primermap(primermap, region_order) # create reverse lookup table if primermap: reverse_map = {aggregated_primermap[i]['name']: i for i in range(len(aggregated_primermap))} elif name_list: reverse_map = {name_list[index]: index for index in range(len(name_list))} elif locusmap: reverse_map = {locusmap.locus_list[index].get_name(): index for index in range(locusmap.size())} # deduce size if primermap: size = len(aggregated_primermap) elif name_list: size = len(name_list) elif locusmap: size = locusmap.size() else: raise ValueError('could not determine size of matrix') # initialize counts array counts = np.zeros([size, size]) # set nan's for 'implicit' mode if force_nan == 'implicit': if primermap: set_cis_trans_nans(counts, aggregated_primermap) elif locusmap: set_cis_trans_nans(counts, locusmap.as_list_of_dict()) # parse countsfile with open(countsfile, 'r') as handle: for line in handle: # skip comments if line.startswith('#'): continue # parse line information name1 = line.strip().split('\t')[0] name2 = line.strip().split('\t')[1] value = float(line.strip().split('\t')[2]) # identify indices within aggregated primermap index1 = None if name1 in reverse_map: index1 = reverse_map[name1] index2 = None if name2 in reverse_map: index2 = reverse_map[name2] # skip unrecognized primers if index1 is None or index2 is None: continue # record value counts[index1, index2] = value counts[index2, index1] = value # set nan's for 'always' mode if force_nan == 'always': if primermap: set_cis_trans_nans(counts, aggregated_primermap) elif locusmap: set_cis_trans_nans(counts, locusmap.as_list_of_dict()) return counts
# test client
[docs]def main(): from lib5c.parsers.primers import load_primermap pixelmap = load_primermap('test/bins.bed') counts = load_counts('test/test.counts', pixelmap) print(counts.keys()) print(len(counts['Klf4'])) print(len(counts['Klf4'][0])) print(counts['Klf4'][0][0]) print(counts['Klf4'][1][0]) print(counts['Klf4'][0][1]) counts = load_counts('test/peaks.counts', pixelmap) print(counts.keys()) print(len(counts['Klf4'])) print(len(counts['Klf4'][0])) print(counts['Klf4'][0][0]) print(counts['Klf4'][1][0]) print(counts['Klf4'][0][1]) primermap = load_primermap('test/primers.bed') counts = load_counts('test/test_raw.counts', primermap) print(len(counts['Klf4'])) print(len(counts['Klf4'][0])) print(counts['Klf4'][75][75]) print(counts['Klf4'][75][142]) print(counts['Klf4'][142][75])
if __name__ == "__main__": main()