Source code for pipeline.h.cli.utils

import pprint

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.api as api
import pipeline.infrastructure.argmapper as argmapper
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import exceptions, task_registry, utils
from . import cli
from .. import heuristics

LOG = infrastructure.get_logger(__name__)


[docs]def get_context(): return cli.stack[cli.PIPELINE_NAME].context
[docs]def get_output_dir(): context = get_context() return context.output_dir
[docs]def get_ms(vis): context = get_context() return context.observing_run.get_ms(name=vis)
[docs]def get_heuristic(arg): if issubclass(arg, api.Heuristic): return arg() if callable(arg): return arg # TODO LOOK IN HEURISTICS MODULE # If the argument is a non-empty string, try to get the class with the # given name, or if that class doesn't exist, wrap the input in an # EchoHeuristic if isinstance(arg, str) and arg: packages = arg.split('.') module = '.'.join(packages[:-1]) # if arg was a raw string with no dots, module is empty if not module: return heuristics.EchoHeuristic(arg) try: m = __import__(module) except ImportError: return heuristics.EchoHeuristic(arg) for package in packages[1:]: m = getattr(m, package, heuristics.EchoHeuristic(arg)) return m() return heuristics.EchoHeuristic(arg)
[docs]def execute_task(context, casa_task, casa_args): pipelinemode = casa_args.get('pipelinemode', None) dry_run = casa_args.get('dryrun', None) accept_results = casa_args.get('acceptresults', True) # get the pipeline task inputs task_inputs = _get_task_inputs(casa_task, context, casa_args) # print them if necessary if pipelinemode == 'getinputs': _print_inputs(casa_task, casa_args, task_inputs) return None # Execute the class, collecting the results results = _execute_task(casa_task, task_inputs, dry_run) # write the command invoked (eg. hif_setjy) to the result so that the # weblog can print help from the XML task definition rather than the # python class results.taskname = casa_task # accept the results if desired if accept_results and not dry_run: _merge_results(context, results) tracebacks = utils.get_tracebacks(results) if len(tracebacks) > 0: previous_tracebacks_as_string = "{}".format("\n".join([tb for tb in tracebacks])) raise exceptions.PipelineException(previous_tracebacks_as_string) return results
def _get_task_inputs(casa_task, context, casa_args): # convert the CASA arguments to pipeline arguments, renaming and # converting as necessary. pipeline_task_class = task_registry.get_pipeline_class_for_task(casa_task) task_args = argmapper.convert_args(pipeline_task_class, casa_args) inputs = vdp.InputsContainer(pipeline_task_class, context, **task_args) return inputs def _execute_task(casa_task, task_inputs, dry_run): # Given the class and CASA name of the stage and the list # of stage arguments, compute and return the results. # Find the task and run it pipeline_task_cls = task_registry.get_pipeline_class_for_task(casa_task) task = pipeline_task_cls(task_inputs) # Reporting stuff goes here # Error checking ? return task.execute(dry_run=dry_run) def _merge_results(context, results): try: results.accept(context) except Exception: LOG.critical('Warning: Check merge to context for {}'.format(results.__class__.__name__)) raise def _print_inputs(casa_task, casa_args, task_inputs): pipeline_class = task_registry.get_pipeline_class_for_task(casa_task) task_args = argmapper.convert_args(pipeline_class, casa_args) pipeline_perspective = {} for arg in task_args: if hasattr(task_inputs, arg): pipeline_perspective[arg] = getattr(task_inputs, arg) casa_args = argmapper.task_to_casa(casa_task, pipeline_perspective) print('Pipeline-derived inputs for {!s}:'.format(casa_task)) pprint.pprint(casa_args) # Resetting pipelinemode after a call to getinputs is not a good idea, as # it makes it very easy to unintentionally execute and commit tasks to the # context, plus the user most probably wants to tweak and call getinputs # multiple times until the parameters look correct # a=inspect.stack() # stacklevel=0 # for k in range(len(a)): # if (string.find(a[k][1], 'ipython console') > 0): # stacklevel=k # break # myf=sys._getframe(stacklevel).f_globals # myf['pipelinemode'] = 'automatic'