Source code for lib5c.tools.pipeline
[docs]def add_pipeline_tool(parser):
pipeline_parser = parser.add_parser(
'pipeline',
prog='lib5c pipeline',
help='run entire pipeline',
)
pipeline_parser.add_argument(
'-R', '--remote_scheduler',
action='store_true',
help='''Use a remote luigi scheduler instead of the local scheduler.''')
pipeline_parser.add_argument(
'-t', '--task_class',
type=str,
help='''Custom Task class defining the pipeline, in the form
'my_module.MyTask' where my_module is available on the PYTHONPATH.''')
pipeline_parser.add_argument(
'-d', '--task_directory',
type=str,
help='''Directory that the module containing the custom Task class
resides in. This directory will be added to sys.path.''')
pipeline_parser.add_argument(
'-w', '--num_workers',
type=int,
default=1,
help='''Set the number of workers to use. The default is 1.''')
pipeline_parser.add_argument(
'-l', '--log_level',
type=str,
choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default='ERROR',
help='''Set luigi's log level. Default is ERROR, which should at least
provide a starting point if something breaks down.''')
pipeline_parser.set_defaults(func=pipeline_tool)
[docs]def pipeline_tool(parser, args):
import os
import sys
import luigi
from lib5c.contrib.luigi.pipeline import PipelineTask
from lib5c.contrib.luigi.config import drop_config_file
if not os.path.exists('luigi.cfg'):
print('no luigi.cfg found in current directory')
print('dropping example luigi.cfg file...')
drop_config_file()
else:
local_scheduler = not args.remote_scheduler
cmdline_args = ['--workers', str(args.num_workers),
'--log-level', args.log_level]
if args.task_class is None:
main_task_cls = PipelineTask
luigi.run(cmdline_args=cmdline_args,
main_task_cls=main_task_cls,
local_scheduler=local_scheduler)
else:
if args.task_directory is not None:
sys.path.append(args.task_directory)
mod, cls = args.task_class.rsplit('.', 1)
cmdline_args.extend(['--module', mod, cls])
luigi.run(cmdline_args=cmdline_args,
local_scheduler=local_scheduler)