Source code for pipeline.infrastructure.jobrequest

import copy
import itertools
import operator
import os
import platform
import re
import sys

import almatasks
import casaplotms
import casatasks

from . import logging
from . import utils

LOG = logging.get_logger(__name__)

# logger for keeping a trace of CASA task and CASA tool calls.
# The filename incorporates the hostname to keep MPI client files distinct
CASACALLS_LOG = logging.get_logger('CASACALLS', stream=None, format='%(message)s', addToCasaLog=False,
                                   filename='casacalls-{!s}.txt'.format(platform.node().split('.')[0]))

# functions to be executed just prior to and immediately after execution of the
# CASA task, providing a way to collect metrics on task execution.
PREHOOKS = []
POSTHOOKS = []


[docs]class FunctionArg(object): """ Class to hold named function or method arguments """ def __init__(self, name, value): self.name = name self.value = value def __str__(self): return '{!s}={!r}'.format(self.name, self.value) def __repr__(self): return 'FunctionArg({!r}, {!r})'.format(self.name, self.value)
[docs]class NamelessArg(object): """ Class to hold unnamed arguments """ def __init__(self, value): self.value = value def __str__(self): return str(self.value) def __repr__(self): return 'NamelessArg({!r})'.format(self.value)
[docs]def alphasort(argument): """ Return an argument with values sorted so that the log record is easier to compare to other pipeline executions. :param argument: the FunctionArg or NamelessArg to sort :return: a value-sorted argument """ if isinstance(argument, NamelessArg): return argument # holds a map of argument name to separators for argument values attrs_and_separators = { 'asis': ' ', 'spw': ',', 'field': ',', 'intent': ',' } # deepcopy as we sort in place and don't want to modify the original argument = copy.deepcopy(argument) name = argument.name value = argument.value if name == 'inpfile' and isinstance(value, list): # get the indices of commands that are not summaries. apply_cmd_idxs = [idx for idx, val in enumerate(value) if "mode='summary'" not in val] # group the indices into consecutive ranges, i.e., between # flagdata summaries. Commands within these ranges can be # sorted. for _, g in itertools.groupby(enumerate(apply_cmd_idxs), lambda i_x: i_x[0] - i_x[1]): idxs = list(map(operator.itemgetter(1), g)) start_idx = idxs[0] end_idx = idxs[-1] + 1 value[start_idx:end_idx] = utils.natural_sort(value[start_idx:end_idx]) else: for attr_name, separator in attrs_and_separators.items(): if name == attr_name and isinstance(value, str) and separator in value: value = separator.join(utils.natural_sort(value.split(separator))) return FunctionArg(name, value)
_uuid_regex = re.compile('[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}', re.I)
[docs]def UUID_to_underscore(argument): """ Return an argument with UUIDs converted to underscores. :param argument: the FunctionArg or NamelessArg to sort :return: a value-sorted argument """ if isinstance(argument, NamelessArg): return argument if not isinstance(argument.value, str): return argument # deepcopy as we sort in place and don't want to modify the original argument = copy.deepcopy(argument) value = _uuid_regex.sub('<UUID>', argument.value) return FunctionArg(argument.name, value)
[docs]def truncate_paths(arg): # Path arguments are kw args with specific identifiers. Exit early if this # is not a path argument if isinstance(arg, NamelessArg): return arg if arg.name not in ('vis', 'caltable', 'gaintable', 'asdm', 'outfile', 'figfile', 'listfile', 'inpfile', 'plotfile', 'fluxtable', 'infile', 'infiles', 'mask', 'imagename', 'fitsimage', 'outputvis'): return arg # wrap value in a tuple so that strings can be interpreted by # the recursive map function basename_value = _recur_map(os.path.basename, (arg.value,))[0] return FunctionArg(arg.name, basename_value)
def _recur_map(fn, data): return [isinstance(x, str) and fn(x) or _recur_map(fn, x) for x in data]
[docs]class JobRequest(object): def __init__(self, fn, *args, **kw): """ Create a new JobRequest that encapsulates a function call and its associated arguments and keywords. """ # remove any keyword arguments that have a value of None or an empty # string, letting CASA use the default value for that argument null_keywords = [k for k, v in kw.items() if v is None or (isinstance(v, str) and not v)] for key in null_keywords: kw.pop(key) self.fn = fn fn_name, is_casa_task = get_fn_name(fn) self.fn_name = fn_name if is_casa_task: # CASA tasks are instances rather than functions, whose execution # begins at __call__. fn = fn.__call__ # the next piece of code does some introspection on the given function # so that we can find out the complete invocation, adding any implicit # or defaulted argument values to those arguments explicitly given. We # use this information if execute(verbose=True) is specified. # get the argument names and default argument values for the given # function code = fn.__code__ argcount = code.co_argcount argnames = code.co_varnames[:argcount] fn_defaults = fn.__defaults__ or list() argdefs = dict(zip(argnames[-len(fn_defaults):], fn_defaults)) # remove arguments that are not expected by the function, such as # pipeline variables that the CASA task is not expecting. unexpected_kw = [k for k, v in kw.items() if k not in argnames] if unexpected_kw: LOG.warning('Removing unexpected keywords from JobRequest: {!s}'.format(unexpected_kw)) for key in unexpected_kw: kw.pop(key) self.args = args self.kw = kw self._positional = [FunctionArg(name, arg) for name, arg in zip(argnames, args)] self._defaulted = [FunctionArg(name, argdefs[name]) for name in argnames[len(args):] if name not in kw and name is not 'self'] self._keyword = [FunctionArg(name, kw[name]) for name in argnames if name in kw] self._nameless = [NamelessArg(a) for a in args[argcount:]]
[docs] def execute(self, dry_run=False, verbose=False): """ Execute this job, returning any result to the caller. :param dry_run: True if the job should be logged rather than executed\ (default: False) :type dry_run: boolean :param verbose: True if the complete invocation, including all default\ variables and arguments, should be logged instead of just those\ explicitly given (default: False) :type verbose: boolean """ msg = self._get_fn_msg(verbose, sort_args=False) if dry_run: sys.stdout.write('Dry run: %s\n' % msg) return for hook in PREHOOKS: hook(self) LOG.info('Executing %s' % msg) # log sorted arguments to facilitate easier comparisons between # pipeline executions sorted_msg = self._get_fn_msg(verbose=False, sort_args=True) CASACALLS_LOG.debug(sorted_msg) try: return self.fn(*self.args, **self.kw) finally: for hook in POSTHOOKS: hook(self)
def _get_fn_msg(self, verbose=False, sort_args=False): if verbose: args = self._positional + self._defaulted + self._nameless + self._keyword else: args = self._positional + self._nameless + self._keyword processed = [truncate_paths(arg) for arg in args] if sort_args: processed = [alphasort(arg) for arg in processed] processed = [UUID_to_underscore(arg) for arg in processed] string_args = [str(arg) for arg in processed] return '{!s}({!s})'.format(self.fn_name, ', '.join(string_args)) def __repr__(self): return 'JobRequest({!r}, {!r})'.format(self.args, self.kw) def __str__(self): return self._get_fn_msg(verbose=False, sort_args=False)
[docs] def hash_code(self, ignore=None): """ Get the numerical hash code for this JobRequest. This code should - but is not guaranteed - to be unique. """ if ignore is None: ignore = [] to_match = dict(self.kw) for key in ignore: if key in to_match: del to_match[key] return self._gen_hash(to_match)
def _gen_hash(self, o): """ Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries). """ if isinstance(o, set) or isinstance(o, tuple) or isinstance(o, list): return tuple([self._gen_hash(e) for e in o]) elif not isinstance(o, dict): return hash(o) new_o = copy.deepcopy(o) for k, v in new_o.items(): new_o[k] = self._gen_hash(v) return hash(tuple(frozenset(new_o.items())))
[docs]def get_fn_name(fn): """ Return a tuple stating the name of the function and whether the function is a CASA task. :param fn: the function to inspect :return: (function name, bool) tuple """ module = fn.__module__ if isinstance(module, object): # # PIPE-697: uvcontfit and copytree commands now appear erroneously as # casaplotms in casa_commands.log # # The pipeline has a handful of shutil file operations wrapped up in # JobRequests and exposed on the casatasks module so that they can be # called and logged in the same manner as CASA operations. The check # below distinguishes CASA tasks/functions from non-CASA code. # for m in (almatasks, casatasks, casaplotms): for k, v in m.__dict__.items(): if v == fn: return k, True return fn.__name__, False