Source code for lib5c.tools.spline
from lib5c.tools.parents import level_parser, simple_in_out_parser, \
parallelization_parser, primerfile_parser
[docs]def add_spline_tool(parser):
spline_parser = parser.add_parser(
'spline',
prog='lib5c spline',
help='spline normalization',
parents=[primerfile_parser, level_parser, simple_in_out_parser,
parallelization_parser]
)
spline_parser.add_argument(
'-J', '--joint_normalize',
action='store_true',
help='''Normalize all input files using a single shared bias model.''')
spline_parser.add_argument(
'-b', '--bias_factors',
type=str,
default='(GC,length)',
help='''The list of bias factors to fit splines for. The default is
"(GC,length)".''')
spline_parser.add_argument(
'-k', '--knots',
type=str,
default='(0,10)',
help='''The number of knots to put in the splines. Pass a list of the
form "(k_1,k_2,k_3)" to use a different number of knots for the
different bias factors. If a bias factor is discrete, pass 0 for its
knot number to use an empirical discrete surface instead of a spline.
The default is '(0,10)'.''')
spline_parser.add_argument(
'-e', '--epsilon',
type=float,
default=1e-4,
help='''The maximum relative change in each bias model between
successive iterations before declaring convergence. The default is
1e-4.''')
spline_parser.add_argument(
'-i', '--max_iterations',
type=int,
default=100,
help='''Maximum number of iterations. The default is 100.''')
spline_parser.add_argument(
'-o', '--model_outfile',
type=str,
help='''Pass a string reference to a path to which to write the splines
as a pickle.''')
spline_parser.add_argument(
'-m', '--expected_model',
type=str,
help='''Pass a string reference to a countsfile containing the expected
model to be used. If this is not passed, a linear log-log fit will be
used.''')
spline_parser.add_argument(
'-U', '--unlog',
action='store_true',
help='''Pass this flag to fit the splines on unlogged counts. By
default, they will be fit to logged counts.''')
spline_parser.add_argument(
'-A', '--asymmetric',
action='store_true',
help='''Pass this flag to consider only the upper triangular entries of
the contact matrices.''')
spline_parser.set_defaults(func=spline_tool)
[docs]def spline_tool(parser, args):
import glob
import pickle
from lib5c.tools.helpers import resolve_parallel, infer_replicate_names, \
resolve_primerfile, resolve_expected_models
from lib5c.parsers.primers import load_primermap
from lib5c.parsers.counts import load_counts
from lib5c.writers.counts import write_counts
from lib5c.algorithms.spline_normalization import \
iterative_spline_normalization
from lib5c.util.system import check_outdir
# resolve parallel and primerfile
primerfile = resolve_primerfile(args.infile, args.primerfile)
if not args.joint_normalize:
resolve_parallel(parser, args, subcommand='spline')
# load counts
print('loading counts')
infiles = glob.glob(args.infile)
primermap = load_primermap(primerfile)
observed_counts = [load_counts(infile, primermap) for infile in infiles]
# resolve expected model
expected_counts = resolve_expected_models(
args.expected_model, observed_counts, primermap, level=args.level)
# resolve bias factors
resolved_bias_factors = list(map(
str.strip, args.bias_factors.strip('()').split(',')))
# resolve knots
try:
resolved_knots = int(args.knots)
except ValueError:
resolved_knots = list(map(int, args.knots.strip('()').split(',')))
# fit splines
print('fitting splines')
splines, _, corrected = iterative_spline_normalization(
observed_counts, expected_counts, primermap, resolved_bias_factors,
max_iter=args.max_iterations, eps=args.epsilon, knots=resolved_knots,
log=not args.unlog, asymmetric=args.asymmetric)
# save splines
if args.model_outfile is not None:
check_outdir(args.model_outfile)
with open(args.model_outfile, 'wb') as handle:
pickle.dump(splines, handle)
# write counts
print('writing counts')
if not args.joint_normalize:
write_counts(corrected[0], args.outfile, primermap)
else:
replicate_names = infer_replicate_names(
infiles, pattern=args.infile if '*' in args.infile else None)
outfiles = [args.outfile.replace(r'%s', rep) for rep in replicate_names]
for i in range(len(corrected)):
write_counts(corrected[i], outfiles[i], primermap)