Source code for pipeline.recipereducer

"""
recipereducer is a utility to reduce data using a standard pipeline procedure.
It parses a XML reduction recipe, converts it to pipeline tasks, and executes
the tasks for the given data. It was written to give pipeline developers 
without access to PPRs and/or a PPR generator a way to reduce data using the
latest standard recipe.

Note: multiple input datasets can be specified. Doing so will reduce the data
      as part of the same session.

Example #1: process uid123.tar.gz using the standard recipe. 

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz'])

Example #2: process uid123.tar.gz using a named recipe.

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz'], 
                                  procedure='procedure_hif.xml')

Example #3: process uid123.tar.gz and uid124.tar.gz using the standard recipe.

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz', 'uid124.tar.gz']) 

Example #4: process uid123.tar.gz, naming the context 'testrun', thus
            directing all weblog output to a directory called 'testrun'. 

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz'], name='testrun') 
    
Example #5: process uid123.tar.gz with a log level of TRACE

    import pipeline.recipereducer
    pipeline.recipereducer.reduce(vis=['uid123.tar.gz'], loglevel='trace') 

"""
import ast
import collections
import os
import traceback
import xml.etree.ElementTree as ElementTree

import pkg_resources

import pipeline.infrastructure.launcher as launcher
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import exceptions, task_registry, utils

LOG = logging.get_logger(__name__)

RECIPES_DIR = pkg_resources.resource_filename(__name__, 'recipes')

TaskArgs = collections.namedtuple('TaskArgs', 'vis infiles session')


def _create_context(loglevel, plotlevel, name):
    return launcher.Pipeline(loglevel=loglevel, plotlevel=plotlevel,
                             name=name).context


def _get_context_name(procedure):
    root, _ = os.path.splitext(os.path.basename(procedure))
    return 'pipeline-%s' % root


def _get_processing_procedure(procedure: str) -> ElementTree:
    # find the procedure file on disk, then fall back to the standard recipes
    if os.path.exists(procedure):
        procedure_file = os.path.abspath(procedure)
    else:
        procedure_file = os.path.join(RECIPES_DIR, procedure)
    if os.path.exists(procedure_file):
        LOG.info('Using procedure file: %s' % procedure_file)
    else:
        msg = f'Procedure not found:: {procedure}'
        LOG.error(msg)
        raise IOError(msg)

    procedure_xml = ElementTree.parse(procedure_file)
    if not procedure_xml:
        msg = f'Could not parse procedure file at {procedure_file}'
        LOG.error(msg)
        raise IOError(msg)

    return procedure_xml


def _get_procedure_title(procedure):
    procedure_xml = _get_processing_procedure(procedure)
    procedure_title = procedure_xml.findtext('ProcedureTitle', default='Undefined')
    return procedure_title


def _get_tasks(context, args, procedure):
    procedure_xml = _get_processing_procedure(procedure)

    commands_seen = []
    for processingcommand in procedure_xml.findall('ProcessingCommand'):
        cli_command = processingcommand.findtext('Command')
        commands_seen.append(cli_command)

        # ignore breakpoints
        if cli_command == 'breakpoint':
            continue

        # skip exportdata when preceded by a breakpoint
        if len(commands_seen) > 1 and commands_seen[-2] == ['breakpoint'] \
                and cli_command == 'hif_exportdata':
            continue

        task_class = task_registry.get_pipeline_class_for_task(cli_command)

        task_args = {}

        if cli_command in ['hif_importdata',
                           'hifa_importdata',
                           'hifv_importdata',
                           'hif_restoredata',
                           'hifa_restoredata',
                           'hsd_importdata',
                           'hsdn_importdata']:
            task_args['vis'] = args.vis
            # we might override this later with the procedure definition
            task_args['session'] = args.session

        elif cli_command in [ 'hsd_restoredata' ]:
            task_args['infiles'] = args.infiles

        for parameterset in processingcommand.findall('ParameterSet'):
            for parameter in parameterset.findall('Parameter'):
                argname = parameter.findtext('Keyword')
                argval = parameter.findtext('Value')
                task_args[argname] = string_to_val(argval)

        task_inputs = vdp.InputsContainer(task_class, context, **task_args)
        task = task_class(task_inputs)
        task._hif_call = _as_task_call(task_class, task_args)
        # we yield rather than return so that the context can be updated
        # between task executions 
        yield task


[docs]def string_to_val(s): """ Convert a string to a Python data type. """ try: pyobj = ast.literal_eval(s) # seems Tsys wants its groupintent as a string # if type(pyobj) in (list, tuple): # pyobj = s return pyobj except ValueError: return s except SyntaxError: return s
def _format_arg_value(arg_val): arg, val = arg_val return '%s=%r' % (arg, val) def _as_task_call(task_class, task_args): kw_args = list(map(_format_arg_value, task_args.items())) return '%s(%s)' % (task_class.__name__, ', '.join(kw_args))
[docs]def reduce(vis=None, infiles=None, procedure='procedure_hifa_calimage.xml', context=None, name=None, loglevel='info', plotlevel='default', session=None, exitstage=None): if vis is None: vis = [] if infiles is None: infiles = [] if context is None: name = name if name else _get_context_name(procedure) context = _create_context(loglevel, plotlevel, name) procedure_title = _get_procedure_title(procedure) context.set_state('ProjectStructure', 'recipe_name', procedure_title) if session is None: session = ['default'] * len(vis) task_args = TaskArgs(vis, infiles, session) task_generator = _get_tasks(context, task_args, procedure) try: while True: task = next(task_generator) LOG.info('Executing pipeline task %s' % task._hif_call) try: result = task.execute(dry_run=False) result.accept(context) except Exception as ex: # Log message if an exception occurred that was not handled by # standardtask template (not turned into failed task result). LOG.error('Unhandled error in recipereducer while running pipeline task %s.' % task._hif_call) traceback.print_exc() return context tracebacks = utils.get_tracebacks(result) if len(tracebacks) > 0: previous_tracebacks_as_string = "{}".format("\n".join([tb for tb in tracebacks])) raise exceptions.PipelineException(previous_tracebacks_as_string) elif result.stage_number is exitstage: break except StopIteration: pass finally: LOG.info('Saving context...') context.save() return context