lib5c.contrib.luigi.pipeline module

Module implementing one particular strategy for wiring together the luigi Task subclasses defined in lib5c.contrib.luigi.tasks into a complete pipeline.

The pipeline is organized as a tree of Tasks, which matches perfectly with a tree of output directories. Each Task in the tree inherits from the mixin class TreeMixin and defines a directory string parameter. This parameter represents the output directory for that Task. Task classes can be reconstituted from directory strings via the directory_to_task() function.

The directory_to_task() function uses the table DictParameter of the TreeMixin, which maps user-selected short names for parameterized Tasks to Task class names as well as detailed parameters. An example of an entry in the table is:

"bin_amean_20_8": ["MakeBinned", {"window_function": "amean",
                                  "bin_width": 8000,
                                  "window_width": 20000}]

where the key, “bin_amean_20_8”, is the user-selected short name for this particular parameterization of the MakeBinned Task class, and the value is a list of two elements. The first element is the Task class name as a string (in this case, MakeBinned, which extends lib5c.contrib.luigi.tasks.BinTask and mixes in TreeMixin). The second element is a dict containing the parameters to construct the Task with. With this entry in the table, when a folder named “bin_amean_20_8” occurs within the directory string, it will be interpreted as a MakeBinned Task with the parameters specified in this table entry.

The upstream Task that a particular Task depends on (i.e., its parent in the tree) can also be reconstituted by splitting off the last folder level in the directory string and calling directory_to_task() on what remains. This logic is implemented in TreeMixin.preceding_task() which allows any Task in the tree to know what tasks precede it in the pipeline.

TreeMixin also describes rep and outfile_pattern parameters. Together with directory, these parameters specify the exact output file of running a particular parameterized Task on one specific replicate, using the logic implemented in TreeMixin.output().

The pipeline is orchestrated by an overall WrapperTask called PipelineTask which stores the table and passes it through to each TreeMixin Task. It also deduces the all_reps list (by peeking at the keys of RawCounts.countsfiles using the luigi config file) and passes it through to each TreeMixin Task as well. It stores a list of directory strings (representing leaf Tasks) in a tasks ListParameter. As a WrapperTask, it wraps all the leaf Tasks in tasks and all replicates in all_reps as appropriate. The leaf Tasks in turn use their directory strings to figure out what Tasks they depend on. In this way the entire tree of pipeline Tasks is created from just one PipelineTask.

class lib5c.contrib.luigi.pipeline.DetermineBins(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.tasks.DetermineBinsTask

Pipeline Task for DetermineBinsTask (the step which decides how to bin the 5C regions).

This Task is pre-wired to depend on the PrimeFile pipeline Task, and to write its output to an output folder called bedfiles/.

output()[source]

Implementation of output(), pre-wired to write the output to the bedfiles/ folder.

Returns

The Target of this Task.

Return type

luigi.Target

requires()[source]

Implementation of requires(), pre-wired to depend on the PrimerFile pipeline Task.

Returns

The Task that this Task depends on.

Return type

luigi.Task

class lib5c.contrib.luigi.pipeline.JointExpressInnerTask(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.JointInnerParallelMixin, lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.ExpressTask

Inner Task class for the MakeJointExpress JointTask.

class lib5c.contrib.luigi.pipeline.JointInnerMixin[source]

Bases: object

Mixin class for inner Tasks wrapped by JointTask.

The inner Task of a JointTask depends on the preceding Task’s output for all replicates.

This mixin provides a helper function _match_input() which subclasses can use to get a glob-based pattern that matches all the input files for the Task which precedes this Task. CmdTasks inheriting from this mixin only need to use this approach if they must describe all their input files using a single string (see JointExpressInnerTask for an example). CmdTasks that can simply list the exact input files they depend on can use something like:

[i.path for i in self.input()]

See QnormInnerTask for an example of this second approach.

A basic implementation of requires() is provided here and should work in most cases, but Task classes inheriting from JointInnerMixin must still define their own implementation of output().

requires()[source]

Basic implementation of requires() for inner Tasks of a JointTask.

This basic implementation assumes that the inner Task depends on the locus file and the preceding Task for each replicate in all_reps.

Subclasses may override this if they depend on more than just these inputs.

Returns

The Tasks that this inner Task depends on.

Return type

list of luigi.Task

class lib5c.contrib.luigi.pipeline.JointInnerParallelMixin[source]

Bases: lib5c.contrib.luigi.pipeline.JointInnerMixin

Mixin class providing a simple implementation of output() for Task classes inheriting from JointInnerMixin.

output()[source]

Simple implementation of output() for Task classes inheriting from JointInnerMixin.

This implemntation assumes that the output files are parallel to the input files (i.e., there is one for each replicate and it can be obtained by interpolating rep into the outfile_pattern).

Returns

The Targets of this inner Task.

Return type

list of luigi.Target

class lib5c.contrib.luigi.pipeline.JointTask(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, luigi.task.WrapperTask

Mixin class for pipeline Tasks that operate on input from all replicates.

Tasks inheriting from JointTask become WrapperTasks, one of which can be created for each replicate, but each of which will depend on the same inner Task which does the actual work. In terms of the overall pipeline flow, this allows a piece of directory to map to a JointTask, which can be instantiated once for each replicate via the rep kwarg of the TreeMixin. All the JointTask instances will depend on a single inner Task inheriting from JointInnerMixin that actually does the work.

Tasks inheriting from JointTask must implement get_inner_task_class(), which should return a Task class which inherits from JointInnerMixin and actually does the work.

Since get_inner_task_class() just returns a Task class which must still be instantiated with the proper parameters, JointTask provides an overrideable hook, get_inner_task_params() to allow Task classes which inherit from JointTask to manually pass their parameters through to the inner Task. See MakeQnorm.get_inner_task_params() for an example.

The related helper function get_inner_task_param_dict() helps to simplify this process by automatically passing through key TreeMixin parameters like table, directory, all_reps, and the @visualizable visualization hook parameters.

get_inner_task_class()[source]
get_inner_task_param_dict()[source]

Constructs the complete dict of params for inner task instantiation.

Provides some important core defaults in the context of the tree pipeline, and injects whatever parameters are returned by get_inner_task_params().

This is a helper function - subclasses should not override this function and should override get_inner_task_params() instead.

Returns

The complete dict of params.

Return type

dict

get_inner_task_params()[source]

Hook to allow subclasses to supply extra parameters to their inner Tasks. Subclasses should override this function.

Returns

Extra parameters to be supplied to the inner task upon construction.

Return type

dict

get_rep_index()[source]

Returns the index of the replicate this Task wraps the output for among all the replicates (in the order of self.all_reps).

Returns

The index of the replicate this Task wraps the output for among all the replicates (in the order of self.all_reps).

Return type

int

outfile_pattern = <luigi.parameter.Parameter object>
output()[source]

Universal implementation of output() for JointTasks.

This implementation simply instantiates the inner Task and asks it for its outputs, returning the one that corresponds to the replicate of this JointTask. The assumption here is that the inner Task class’s output() will be a list whose elements correspond to the replicates in all_reps.

Returns

The Target of this JointTask.

Return type

luigi.Target

rep = <luigi.parameter.Parameter object>
requires()[source]

Universal implementation of requires() for JointTasks.

Simply put, the JointTask depends on its inner Task class, instantiated using the parameters obtained from get_inner_task_params() via get_inner_task_param_dict().

Returns

The Task instance of the inner Task that this WrapperTask depends on.

Return type

luigi.Task

class lib5c.contrib.luigi.pipeline.MakeBinned(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.BinTask

Pipeline Task class for the binning step.

Unlike most countsfile-to-countsfile steps, the binning step needs to use two different locus Tasks as input: the primerfile and the binfile. Therefore, this class must provide a custom implementation of requires() to specify this.

bin_width = <luigi.parameter.IntParameter object>
requires()[source]

Depends on both the binfile (represented by a DetermineBins instance) and the primerfile (represented by the PrimerFile instance) in addition to the preceding Task.

Returns

The Tasks this Task depends on.

Return type

tuple of luigi.Task

class lib5c.contrib.luigi.pipeline.MakeCrossVariance(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.CrossVarianceTask

Pipeline Task for the cross-replicate variance modeling step.

Even though this Task depends on multiple replicates, it is not implemented as a JointTask.

requires()[source]

Depends on the preceding Task for the same replicate (assumed to be the expected counts) and the Task that precedes that Task (assumed to be the observed counts) for all replicates in this Task’s condition.

This Task’s condition is inferred to be the first condition in the comma-separated string parameter conditions that is a substring of rep. Other replicates match this condition if this condition is also a substring of their replicate names.

Returns

The Tasks this Task depends on. The first Task is the locus info Task, the second is the expected Task for this replicate, and the remaining Tasks in the list are observed Tasks for all replicates in the same condition as this replicate.

Return type

list of luigi.Task

class lib5c.contrib.luigi.pipeline.MakeExpected(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.ExpectedTask

Pipeline Task class for the expected modeling step. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeExpress(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.ExpressTask

Pipeline Task class for the express step. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeIced(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.IcedTask

Pipeline Task class for the ICED balancing step. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeInteractionScores(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.InteractionScoreTask

Pipeline Task class for InteractionScoreTask. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeJointExpress(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.JointTask

Outer wrapper pipeline JointTask for the joint express step.

get_inner_task_class()[source]

Points to JointExpressInnerTask, the inner Task for the joint express step.

Returns

The inner Task class for this JointTask.

Return type

luigi.Task

heatmap = <luigi.parameter.BoolParameter object>
heatmap_outdir = <luigi.parameter.Parameter object>
run()
class lib5c.contrib.luigi.pipeline.MakeKR(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.KnightRuizTask

Pipeline Task class for the Knight-Ruiz balancing step. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeLegacyPvaluesOne(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.LegacyPvaluesOneTask

Pipeline Task for an old version of the p-value calling step. Deprecated.

requires()[source]

Unlike the modern PvaluesTask which depends on obs, exp, and var, this old version only used the obs and the exp.

class lib5c.contrib.luigi.pipeline.MakeLogged(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.LogTask

Pipeline Task class for LogTask. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeObsMinusExp(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.SubtractTask

Pipeline Task class for the obs-exp step (analogous to the obs/exp step but for data that have already been log-transformed).

requires()[source]

Depends on both the preceding Task (assumed to be the expected counts) and the Task that precedes that Task (assumed to be the observed counts).

Returns

The Tasks this Task depends on.

Return type

tuple of luigi.Task

class lib5c.contrib.luigi.pipeline.MakeObsOverExp(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.DivideTask

Pipeline Task class for the obs/exp step.

requires()[source]

Depends on both the preceding Task (assumed to be the expected counts) and the Task that precedes that Task (assumed to be the observed counts).

Returns

The Tasks this Task depends on.

Return type

tuple of luigi.Task

class lib5c.contrib.luigi.pipeline.MakePvalues(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.PvalueTask

Pipeline Task for the p-value calling step.

requires()[source]

Depends on three Tasks: the preceding Task (assumed to be the variance counts), the Task that precedes that Task (assumed to be the expected counts) and the Task that precedes that Task (assumed to be the observed counts).

Returns

The Tasks this Task depends on.

Return type

tuple of luigi.Task

class lib5c.contrib.luigi.pipeline.MakeQnorm(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.JointTask

Outer wrapper pipeline JointTask for the qnorm step.

averaging = <luigi.parameter.BoolParameter object>
condition_on = <luigi.parameter.Parameter object>
get_inner_task_class()[source]

Points to QnormInnerTask, the inner Task for the qnorm step.

Returns

The inner Task class for this JointTask.

Return type

luigi.Task

get_inner_task_params()[source]

Passes through all the parameters for the qnorm step.

Returns

The parameters for the qnorm step.

Return type

dict

heatmap = <luigi.parameter.BoolParameter object>
heatmap_outdir = <luigi.parameter.Parameter object>
reference = <luigi.parameter.Parameter object>
regional = <luigi.parameter.BoolParameter object>
run()
class lib5c.contrib.luigi.pipeline.MakeQvalues(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.QvaluesTask

Pipeline Task class for the multiple testing correction step, which converts p-values to q-values. All functionality is handled by PerRepSimpleTreeMixin.

Note that the thresholding step performs its own multiple testing correction when parameterized with bh_fdr=True, so this step is never required.

class lib5c.contrib.luigi.pipeline.MakeRaw(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, luigi.task.Task

Pipeline Task for performing the “raw” step of the pipeline.

This step doesn’t actually do anything, so it just copies over the input countsfile (which is actually represented by a RawCounts Task) into the output directory tree. By having a separate step for this we guarantee that a) a raw countsfile can be found with a predictable name (in agreement with the replicate names which are set by the keys of RawCounts.countsfiles) and in a predictable spot in the output directory structure, and b) the raw countsfile can be visualized using the same visualization hooks as any other step.

heatmap = <luigi.parameter.BoolParameter object>
heatmap_outdir = <luigi.parameter.Parameter object>
run()
class lib5c.contrib.luigi.pipeline.MakeRemoved(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.OutliersTask

Pipeline Task class for the high outlier removal step. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeSmoothed(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.SmoothTask

Pipeline Task class for the smoothing step. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeSpline(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin, lib5c.contrib.luigi.tasks.SplineTask

Pipeline Task class for the explicit spline normalization step. All functionality is handled by PerRepSimpleTreeMixin.

class lib5c.contrib.luigi.pipeline.MakeThreshold(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.JointInnerMixin, lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.ThresholdTask

Pipeline Task for the loop call thresholding step.

This Task is implemented as if it were the inner Task of a JointTask, but since there is only one ThresholdTask for all replicates, it does not need a corresponding WrapperTask to wrap itself across replicates.

It gets its implementation of requires() from JointInnerMixin, which correctly depends on the output of the preceding Task (assumed to be the p-values) across all_reps.

output()[source]

Specifies the output file locations for the thresholding step.

These locations are controlled by the outfile_pattern (countsfile of final cluster assignments), dataset_outfile (table of complete results), and kappa_confusion_outfile (text file of summary information and concordance metrics).

Returns

The Targets resulting from this Task.

Return type

tuple of luigi.Target

class lib5c.contrib.luigi.pipeline.MakeVariance(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.VarianceTask

Pipeline Task for the variance modeling step.

requires()[source]

Depends on both the preceding Task (assumed to be the expected counts) and the Task that precedes that Task (assumed to be the observed counts).

Returns

The Tasks this Task depends on.

Return type

tuple of luigi.Task

class lib5c.contrib.luigi.pipeline.PerRepSimpleTreeMixin[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin

Mixin class that adds the most common implementation of requires() to TreeMixin.

Most pipeline Tasks depend on two inputs: a primer or binfile, and the immediately preceding countsfile for the rep of the child Task.

Pipeline Tasks that depend on more than one countsfile (e.g., p-value calling), or all replicates (e.g., thresholding) cannot use this mixin, and instead must inherit from TreeMixin and define their own implementation of requires().

requires()[source]
class lib5c.contrib.luigi.pipeline.PipelineTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

Overall wrapper Task that orchestrates the entire pipeline.

Running this Task runs every leaf Task in the tasks ListParameter as well as all parent Tasks needed to get from the root (raw input countsfiles) to those leaves.

Tasks should be specified in the tasks ListParameter in the form of directory strings to the leaf Tasks (final step in a chain of Tasks).

Individual folders in the directory strings in tasks will be converted to properly parameterized Task instances via the table DictParameter, which should map folder names to lists of two items: the appropriate pipeline Task class name as a string, and a dict of parameters to instantiate that Task class with. See the module docstring for an example.

The leaf Tasks will automatically be parallelized across all_reps unless they are MakeThreshold (the Task class for which rep is always None).

requires()[source]

Deduces all_reps and wraps all the leaf Tasks in tasks over all replicates if appropriate, passing though table and all_reps.

table = <luigi.parameter.DictParameter object>
tasks = <luigi.parameter.ListParameter object>
class lib5c.contrib.luigi.pipeline.PrimerFile(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

Pipeline Task for finding the input primerfile on the disk.

output()[source]

Implementation of output().

Returns

A LocalTarget pointing to this Task’s primerfile parameter, which should be the location of the input primerfile on the disk.

Return type

luigi.Target

primerfile = <luigi.parameter.Parameter object>
class lib5c.contrib.luigi.pipeline.QnormInnerTask(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.JointInnerParallelMixin, lib5c.contrib.luigi.pipeline.TreeMixin, lib5c.contrib.luigi.tasks.QnormTask

Inner Task class for the MakeQnorm JointTask.

class lib5c.contrib.luigi.pipeline.RawCounts(*args, **kwargs)[source]

Bases: lib5c.contrib.luigi.pipeline.TreeMixin, luigi.task.ExternalTask

Pipeline Task for finding the raw input countsfiles on the disk.

This step is not resolved through the table, but instead uses its own DictParameter countsfiles which should map replicate names to the paths of the raw input countsfiles on the disk.

countsfiles = <luigi.parameter.DictParameter object>
outfile_pattern = <luigi.parameter.Parameter object>
output()[source]

Looks up the location of the countsfile for this replicate using the countsfiles DictParameter and returns a LocalTarget pointing to it.

Returns

The Target corresponding to the raw input countsfile represented by this Task.

Return type

luigi.Target

rep = <luigi.parameter.Parameter object>
class lib5c.contrib.luigi.pipeline.TreeMixin[source]

Bases: object

Core mixin class for pipeline Tasks. See the module docstring for more details.

If mixed with a lib5c.contrib.luigi.tasks.CmdTask subclass, the only luigi function that the derived class needs to implement is requires().

all_reps = <luigi.parameter.ListParameter object>
directory = <luigi.parameter.Parameter object>
locus_info_task()[source]

Returns the Task instance corresponding to the primerfile or binfile needed by this Task.

Returns

The Task instance corresponding to the primerfile or binfile needed by this Task.

Return type

luigi.Task

outfile_pattern = <luigi.parameter.Parameter object>
output()[source]

Returns the luigi Target corresponding to the output file that is the direct result of running this Task.

Returns

The Target corresponding to the output file that is the direct result of running this Task.

Return type

luigi.Target

preceding_task(rep=None)[source]

Returns the Task instance that precedes this Task.

Parameters

rep (str, optional) – The replicate name to parameterize the parent Task with. Pass None if the Task is not a per-rep Task.

Returns

The Task instance that precedes this Task.

Return type

luigi.Task

rep = <luigi.parameter.Parameter object>
table = <luigi.parameter.DictParameter object>
lib5c.contrib.luigi.pipeline.directory_to_task(directory, table, all_reps, **kwargs)[source]

Converts a directory to a TreeMixin Task class instance, using a provided table.

Parameters
  • directory (str) – The directory identifying this task.

  • table (Dict[str, Tuple[str, dict[str, Any]]]) – A map from directory parts to (Task class name, param dict) tuples.

  • all_reps (List[str]) – A list of all the replicates.

  • kwargs (additional keyword arguments) – Will be passed to the new Task instance. The most common kwarg is ‘rep’.

Returns

The specified Task instance.

Return type

luigi.Task