Source code for lib5c.tools.helpers

import os
import glob
import subprocess
import sys

try:
    from bsub import bsub
    from bsub.bsub import BSubJobNotFound

    bsub_avail = True
except ImportError:
    bsub_avail = False

from lib5c.util.system import shell_quote


[docs]def infer_replicate_names(infiles, as_dict=False, pattern=None): """ Infers replicate names given a list of filenames. Parameters ---------- infiles : list of str The filenames to consider. as_dict : bool Pass True to make this function return a dict mapping the the infiles to their inferred replicate names. pattern : str, optional If the infiles are glob-based matches to a patten containing one wildcard, pass that pattern to use it to reconstruct the replicate names. Returns ------- list of str or dict If as_dict is False (the default), this is just the list of inferred rep names, in the same order as infiles. If as_dict is True, this is a dict mapping the original infiles to their inferred replicate names. """ if pattern: prefix, postfix = pattern.split('*') if as_dict: return {infile: infile[len(prefix):-len(postfix)] for infile in infiles} return [infile[len(prefix):-len(postfix)] for infile in infiles] common_prefix = os.path.commonprefix(infiles) common_postfix = os.path.commonprefix( [infile[::-1] for infile in infiles])[::-1] if len(infiles) == 1: common_prefix = '' common_postfix = '' if as_dict: return { infile: infile[len(common_prefix):len(infile)-len(common_postfix)] for infile in infiles } return [infile[len(common_prefix):len(infile)-len(common_postfix)] for infile in infiles]
[docs]def infer_level_mapping(rep_names, triggers): """ Infers a mapping from replicate names to level names (i.e., classes or conditions) using a simple trigger substring approach. A replicate is assigned to a level if the level's trigger substring is a substring of the replicate name. Parameters ---------- rep_names : list of str The replicate names to assign levels to. triggers : dict or list of str Pass a dict mapping trigger substrings to level names, or pass a list of level names to use the level names as their own trigger substrings. Returns ------- dict A mapping from rep_names to level names. """ # resolve triggers if type(triggers) is not dict: triggers = {trigger: trigger for trigger in triggers} levels = {} for rep_name in rep_names: target_level = None for t in triggers: if t in rep_name: target_level = triggers[t] print('assigning rep %s to class %s' % (rep_name, target_level)) break if target_level is None: raise ValueError('could not assign replicate %s to any of the ' 'color-coding classes %s' % (rep_name, list(triggers.values()))) else: levels[rep_name] = target_level return levels
def _determine_flags(parser, args): args_dict = vars(args) args_string = '' for action in parser._actions: if action.dest == 'help' or r'%s' in args_dict[action.dest]: continue if len(action.option_strings) > 0: args_string += ' --%s %s' % (action.dest, args_dict[action.dest]) return args_string def _parallelize_flags(parser, args, subcommand='', key_arg='infile'): # turn args into a dict and fish out the key argument args_dict = vars(args) key_arg_value = args_dict[key_arg].strip('\'"') # glob for infiles infiles = glob.glob(key_arg_value) # infer replicate names replicate_names = infer_replicate_names( infiles, pattern=key_arg_value if '*' in key_arg_value else None) # construct the args_strings args_strings = [] for i in range(len(infiles)): rep = replicate_names[i] infile = infiles[i] args_string = '' for action in parser._actions: # print action.dest # skip help flag if action.dest in ['help', 'version']: continue # recurse into subparsers if action.dest == '==SUPPRESS==': # print 'recursing into' # print action if not subcommand: subcommand = list(action.choices.keys())[0] if ' ' in subcommand: subcommand, subsubcommand = subcommand.split(' ', 1) return _parallelize_flags(action.choices[subcommand], args, subcommand=subsubcommand, key_arg=key_arg) return _parallelize_flags(action.choices[subcommand], args, key_arg=key_arg) # optional arguments if len(action.option_strings) > 0: # skip if the value is None if args_dict[action.dest] is None: continue # special logic for booleans, assumed to use store_true elif type(args_dict[action.dest]) == bool: if args_dict[action.dest]: args_string += ' --%s' % action.dest # standard optional arguments else: opt_value = args_dict[action.dest] if type(opt_value) == str: if r'%s' in args_dict[action.dest]: opt_value = opt_value.replace(r'%s', rep) args_string += ' --%s %s' % \ (action.dest, shell_quote(opt_value)) # positional arguments else: if action.dest == key_arg: args_string += ' %s' % infile else: pos_value = args_dict[action.dest] if type(pos_value) == str: if r'%s' in args_dict[action.dest]: pos_value = pos_value.replace(r'%s', rep) args_string += ' %s' % shell_quote(pos_value) args_strings.append(args_string) return args_strings
[docs]def resolve_parallel(parser, args, subcommand='', key_arg='infile', root_command='lib5c'): """ Parallelizes as a command via bsub if it is available. Parameters ---------- parser : argparse.ArgumentParser The parser used to parse the args for the root command. args : argparse.Namespace The args parsed by the parser. subcommand : str The particular subcommand of the root command being invoked. key_arg : str The argument to parallelize over. root_command : str The string used to invoke the root command. """ # resolve script name script_name = '%s %s' % (root_command, subcommand) # turn args into a dict args_dict = vars(args) # glob for infiles infiles = glob.glob(args_dict[key_arg].strip('\'"')) if len(infiles) > 1 or '*' in args_dict[key_arg]: args_strings = _parallelize_flags(parser, args, subcommand=subcommand, key_arg=key_arg) if bsub_avail: if not os.path.exists('logs'): os.makedirs('logs') try: job_ids = [] for arg_string in args_strings: sub = bsub( '.'.join([script_name.replace(' ', '.')] + ['%s_%s' % (k, v) for k, v in args_dict.items() if k not in ['func', 'infile', 'outfile', 'hang', 'primerfile']]), verbose=False ) command = '%s %s' % (script_name, arg_string) sub(command) job_ids.append(sub.job_id) if 'hang' in args_dict and args_dict['hang']: for job_id in job_ids: bsub.poll(job_id) except BSubJobNotFound: for arg_string in args_strings: command = '%s %s' % (script_name, arg_string) print(command) subprocess.call(command, shell=True) else: for arg_string in args_strings: command = '%s %s' % (script_name, arg_string) print(command) subprocess.call(command, shell=True) sys.exit()
[docs]def split_self_regionally(regions, script='lib5c', hang=False): """ Allows a command line script that accepts a --region flag to split itself into a separate command run for each region. Parameters ---------- regions : list of str The regions to split into. script : str The name of the script to invoke. hang : bool Pass True to cause the original executing process to hang until all the bsub jobs spawned by this function complete. This does nothing if bsub is not available. """ if not os.path.exists('logs'): os.makedirs('logs') n_tokens = len(script.split()) command_pattern = ' '.join([script] + [arg if arg.startswith('-') else shell_quote(arg) for arg in sys.argv[n_tokens:]] + ['--region %r']) if bsub_avail: try: job_ids = [] for region in regions: sub = bsub('.'.join(sys.argv[1:] + [region]), verbose=False) command = command_pattern.replace(r'%r', region) print(command) sub(command) job_ids.append(sub.job_id) if hang: for job_id in job_ids: bsub.poll(job_id) except BSubJobNotFound: for region in regions: command = command_pattern.replace(r'%r', region) print(command) subprocess.call(command, shell=True) else: for region in regions: command = command_pattern.replace(r'%r', region) print(command) subprocess.call(command, shell=True) sys.exit()
[docs]def resolve_primerfile(infile, primerfile=None): """ Searches for a primerfile next to in infile. Parameters ---------- infile : str or list of str The infile(s) to look next to. primerfile : str, optional If you already know where the primerfile is pass it here to skip the search. Returns ------- str The primerfile. """ if primerfile is not None: return primerfile # glob for infiles infiles = [] if type(infile) == str: infiles = glob.glob(infile.strip('\'"')) else: for f in infile: infiles.extend(glob.glob(f.strip('\'"'))) # try to find it try: return glob.glob( os.path.join(os.path.split(infiles[0])[0], '*.bed'))[0] except IndexError: raise IOError('missing primerfile or countsfile')
[docs]def resolve_level(primermap, level='auto'): """ Resolves the level of some input data. Parameters ---------- primermap : primermap Primermap to try to resolve the level of. level : str If you already know the level, you can pass it as a string here. Returns ------- str The resolved level. Notes ----- This function operates on a "three in a row" heuristic: if the first three bins in the primemap are all of the same size, then we guess that it's bin level data. """ if level != 'auto': return level region = list(primermap.keys())[0] lengths = [primermap[region][i]['end'] - primermap[region][i]['start'] for i in range(3)] if lengths[0] == lengths[1] and lengths[0] == lengths[2]: return 'bin' else: return 'fragment'
[docs]def resolve_expected_models(expected_model_string, observed_counts, primermap, level=None): """ Convenience helper for resolving expected models. Parameters ---------- expected_model_string : str If None, we expect to estimate fresh expected models from ``observed_counts``. If a path to a specific countsfile, we expect that it contains the expected model to be used for all the observed counts. If a glob-expandable path, we expect that each file matching the pattern is to be used for one of the observed counts (assuming they too are in glob order). observed_counts : list of dict of np.ndarray Each element in the list is one replicate, represented as a counts dict of observed values. primermap : primermap The primermap to use for parsing files, etc. level : {'bin', 'fragment'} The level to use if a fresh expected modeul must be estimated. Returns ------- list of dict of np.ndarray The resolved expected models. """ from lib5c.parsers.counts import load_counts from lib5c.algorithms.expected import \ make_poly_log_log_binned_expected_matrix, \ make_poly_log_log_fragment_expected_matrix if expected_model_string: expected_files = glob.glob(expected_model_string.strip('\'"')) if len(expected_files) == len(observed_counts): print('loading expected models') expected_counts = [load_counts(expected_file, primermap) for expected_file in expected_files] elif len(expected_files) == 1: print('loading expected model') expected_count = load_counts(expected_model_string, primermap) expected_counts = [expected_count] * len(observed_counts) else: raise ValueError('unexpected number of expected countsfiles') else: print('precomputing expected models') if resolve_level(primermap, level) == 'fragment': expected_counts = [make_poly_log_log_fragment_expected_matrix( observed_count, primermap) for observed_count in observed_counts] else: expected_counts = [make_poly_log_log_binned_expected_matrix( observed_count) for observed_count in observed_counts] return expected_counts