Source code for lib5c.tools.visualize_variance
from lib5c.tools.parents import lim_parser, primerfile_parser, var_parser
[docs]def add_visualize_variance_tool(parser):
visualize_variance_parser = parser.add_parser(
'visualize-variance',
prog='lib5c plot visualize-variance',
help='visualize variance estimates',
parents=[primerfile_parser, lim_parser, var_parser]
)
visualize_variance_parser.add_argument(
'observed',
type=str,
help='''File containing observed counts.''')
visualize_variance_parser.add_argument(
'expected',
type=str,
help='''File containing expected counts.''')
visualize_variance_parser.add_argument(
'outfile',
type=str,
help='''Filename to draw plot to.''')
visualize_variance_parser.add_argument(
'--x_unit',
type=str,
choices=['dist', 'exp'],
default='dist',
help='''The x-units of the plot. Default is 'dist'.''')
visualize_variance_parser.add_argument(
'--y_unit',
type=str,
choices=['disp', 'var'],
default='var',
help='''The y-units of the plot. Default is 'var'.''')
visualize_variance_parser.add_argument(
'--logx',
action='store_true',
help='''Pass to log the x-axis.''')
visualize_variance_parser.add_argument(
'--unlogy',
action='store_true',
help='''Pass to log the y-axis (default is to log it).''')
visualize_variance_parser.add_argument(
'-S', '--scatter',
action='store_true',
help='''Pass to plot a simple scatterplot (default is a hexbin
plot).''')
visualize_variance_parser.add_argument(
'-v', '--variance',
help='''Pass a countsfile of variance estimates to overlay the estimates
over the data.''')
visualize_variance_parser.add_argument(
'-X', '--adjust_x',
action='store_true',
help='''If -v/--variance is passed and was estimated with --x_unit exp,
but is now being visualized with --x_unit dist, pass this flag to use
the average disp at every dist to make the plotted estimate a function
of distance. This doesn't work in the other direction (--x_unit of the
fit was dist but is being visualized as exp).''')
visualize_variance_parser.add_argument(
'-Y', '--adjust_y',
action='store_true',
help='''If -v/--variance is passed and was estimated with --x_unit dist
and different --y_unit than it is being visualized with here, and
--x_unit is dist, pass this flag to use the average exp at each distance
to make the plotted estimate a function of distance. If variance was
estimated with --x_unit exp, you can change --y_unit without passing
this flag.''')
visualize_variance_parser.add_argument(
'-r', '--region',
type=str,
help='''Pass a region name as a string to visualize the variance for
only one region.''')
visualize_variance_parser.set_defaults(func=visualize_variance_tool)
[docs]def visualize_variance_tool(parser, args):
import glob
import numpy as np
from lib5c.tools.helpers import resolve_primerfile
from lib5c.parsers.primers import load_primermap
from lib5c.parsers.counts import load_counts
from lib5c.util.counts import flatten_and_filter_counts
from lib5c.algorithms.variance import estimate_variance
from lib5c.algorithms.variance.lognorm_dispersion import \
variance_to_dispersion as lognorm_var_to_disp, \
dispersion_to_variance as lognorm_disp_to_var, \
dispersion_to_variance_direct as lognorm_disp_to_var_direct
from lib5c.algorithms.variance.nbinom_dispersion import \
variance_to_dispersion as nbinom_var_to_disp, \
dispersion_to_variance as nbinom_disp_to_var
from lib5c.algorithms.expected import global_lowess_binned_log_counts
from lib5c.plotters.scatter import scatter
from lib5c.plotters.curve_fits import plot_fit
# resolve primerfile
primerfile = resolve_primerfile(args.observed, args.primerfile)
# load expected
primermap = load_primermap(primerfile)
expected_counts = load_counts(args.expected, primermap)
# resolve xlim and ylim
xlim = list(map(float, args.xlim.strip('()').split(','))) \
if args.xlim is not None else None
ylim = list(map(float, args.ylim.strip('()').split(','))) \
if args.ylim is not None else None
# base value for key rep, will be resolved inside the next block
key_rep = None
# special loading for cross-replicate mode
if 'cross_rep' in args.source:
# expand infiles
expanded_infiles = glob.glob(args.observed)
if len(expanded_infiles) < 2:
raise ValueError('cross-replicate variance estimation requires at '
'least 2 replicates')
# resolve key rep
if args.rep is not None:
for expanded_infile in expanded_infiles:
if args.rep in expanded_infile:
key_rep = expanded_infile
break
# load counts
observed_counts = {
expanded_infile: load_counts(expanded_infile, primermap)
for expanded_infile in expanded_infiles}
else:
observed_counts = load_counts(args.observed, primermap)
print('preparing to plot')
# estimate variance
variance = estimate_variance(
observed_counts, expected_counts, key_rep=key_rep, model=args.model,
source=args.source, fitter='none', min_disp=float(args.min_disp),
min_obs=args.min_obs, min_dist=args.min_dist,
regional=args.region is not None)
# subset region if necessary
if args.region is not None:
# no need to subset observed_counts, they aren't used after variance
# estimation
expected_counts = {args.region: expected_counts[args.region]}
variance = {args.region: variance[args.region]}
# flatten, including var hat if passed
count_types = {'exp': expected_counts, 'var': variance}
if args.variance is not None:
var_hat_counts = load_counts(args.variance, primermap)
count_types['var_hat'] = {args.region: var_hat_counts[args.region]} \
if args.region is not None else var_hat_counts
flat, _, _ = flatten_and_filter_counts(count_types)
# prepare one-dimensional model if we need it
exp_model = np.array(global_lowess_binned_log_counts(
expected_counts, exclude_near_diagonal=True)) \
if args.adjust_x or args.adjust_y else None
# decide units for plotting
x = flat['dist'] if args.x_unit == 'dist' else flat['exp']
var_to_disp = lognorm_var_to_disp if args.model == 'lognorm' \
else nbinom_var_to_disp if args.model == 'nbinom' else lambda a, b: a
disp_to_var = lognorm_disp_to_var if args.model == 'lognorm' \
else nbinom_disp_to_var if args.model == 'nbinom' else lambda a, b: a
if args.y_unit == 'disp':
y = var_to_disp(flat['var'], flat['exp'])
if args.variance is not None:
flat['var_hat'] = \
var_to_disp(flat['var_hat'], exp_model[flat['dist']])\
if args.adjust_y \
else var_to_disp(flat['var_hat'], flat['exp'])
if args.adjust_x:
for d in np.unique(flat['dist']):
flat['var_hat'][flat['dist'] == d] = \
np.mean(flat['var_hat'][flat['dist'] == d])
else:
y = flat['var']
if args.model == 'norm':
disp_to_var = lognorm_disp_to_var_direct
y = disp_to_var(y, flat['exp'])
if args.variance is not None:
if args.adjust_x or args.adjust_y:
disp_hat = var_to_disp(flat['var_hat'], flat['exp'])
if args.adjust_x:
for d in np.unique(flat['dist']):
disp_hat[flat['dist'] == d] = \
np.mean(disp_hat[flat['dist'] == d])
flat['var_hat'] = disp_to_var(
disp_hat, exp_model[flat['dist']] if args.adjust_y
or (args.adjust_x and args.x_unit == 'dist')
else flat['exp'])
# extra check in case disp_to_var made y infinite
idx = np.isfinite(y)
x = x[idx]
y = y[idx]
if args.variance is not None:
flat['var_hat'] = flat['var_hat'][idx]
print('plotting')
if args.variance is not None:
plot_fit(x, y, flat['var_hat'], logx=args.logx, logy=not args.unlogy,
hexbin=not args.scatter, xlabel=args.x_unit,
ylabel=args.y_unit, xlim=xlim, ylim=ylim, outfile=args.outfile)
else:
scatter(x, y, logx=args.logx, logy=not args.unlogy,
hexbin=not args.scatter, xlabel=args.x_unit,
ylabel=args.y_unit, xlim=xlim, ylim=ylim, outfile=args.outfile)