Source code for pipeline.infrastructure.vdp

"""
vdp is a pipeline framework module that contains classes to make writing task
Inputs easier.

- InputsContainer lets task implementations operate within the scope of a
  single measurement set, even if the pipeline run contains multiple data
  sets.

- VisDependentProperty is a reworking of pipeline properties to reduce the
  amount of boilerplate code required to implement an Inputs class.
    
Implementation details:

    See the documentation on the classes, particularly VisDependentProperty,
    for detailed information on how the framework operates. 
    
Examples:

    There are three common scenarios that use VisDependentProperty. The
    following examples show each scenario for an Inputs property belonging to
    an Inputs class that extends vdp.StandardInputs.
    
    1. To provide a default value that can be overridden on a per-MS basis. Use
       the optional 'default' argument to VisDependentProperty, eg: 
    
        myarg = VisDependentProperty(default='some value')
        
    2. For more sophisticated default values, e.g., a default value that is a
       function of other data or properties, use the @VisDependentProperty
       decorator. A class property decorated with @VisDependentProperty should
       return the default value for that property. The function will execute
       in the scope of a single measurement set, i.e., at the time it is
       called, vis is set to exactly one value. The function will be called to
       provide a default value for any measurement set that does not have a
       user override value.

        @VisDependentProperty
        def myarg():
            # do some processing then return the calculated value
            return 'I am a custom property for measurement set %s' % self.vis

    3. Convert or validate user input before accept it as an Inputs argument.
       Use the @VisDependentProperty.convert decorator, possibly alongside the
       getter decorator as above.
       
        @VisDependentProperty
        def myarg():
            # this will return 100 - but only if the user has not supplied
            # an override value!
            return 100

        @VisDependentProperty.convert
        def myarg(user_input):
            # convert then return the user input which is provided as an
            # argument to the convert decorator. The converted value will be
            # returned for all subsequent 'gets'.
            return int(user_input)
    
"""
import collections
import copy
import inspect
import numbers
import os
import pprint

from . import api
from . import argmapper
from . import launcher
from . import logging
from . import task_registry
from . import utils

__all__ = ['VisDependentProperty',
           'InputsContainer',
           'StandardInputs',
           'ModeInputs']

LOG = logging.get_logger(__name__)


class SingletonType(type):
    """
    SingletonType is a metaclass that ensures that only a single instance of
    a class exists. Creating additional instances returns the existing
    instance.
    """

    def __call__(cls, *args, **kwargs):
        try:
            return cls.__instance
        except AttributeError:
            cls.__instance = super(SingletonType, cls).__call__(*args, **kwargs)
            return cls.__instance


class NullMarker(object):
    """
    NullMarker is a class that represents the null "parameter not-set" case.
    It exists to distinguish between a user-provided null value, such as None
    or '', and an argument that is null because it has not been set.
    """
    def __init__(self, null_input):
        """
        Values equal to those in null_input will be considered equal to null.

        :param null_input: user inputs considered equivalent to a NullMarker.
        :type null_input: iterable
        """
        self.null_input = tuple(null_input)

    def convert(self, val):
        """
        Process the argument, converting user input considered equivalent to
        null to a NullMarker object.
        """
        if isinstance(val, NullMarker) or val in self.null_input:
            return self
        return val

    def __eq__(self, other):
        return isinstance(other, NullMarker)

    def __ne__(self, other):
        return not isinstance(other, NullMarker)

    def __str__(self):
        return 'NullMarker({!s})'.format(','.join(self.null_input))


# shared instance, as very few inputs require custom null equivalents
_NULL = NullMarker(null_input=[None, ''])


class NoDefaultMarker(object, metaclass=SingletonType):
    """
    NoDefaultMarker is a class that represents the null "parameter not-set" case
    for default values. It exists to distinguish between a user-provided
    default value of None, and when a default has not been set.
    """

    def __eq__(self, other):
        return isinstance(other, NoDefaultMarker)

    def __ne__(self, other):
        return not isinstance(other, NoDefaultMarker)


class PipelineInputsMeta(type):
    """
    Sets the name of a VisDependentProperty at class definition time.
    """
    def __new__(mcls, name, bases, attrs):
        cls = super(PipelineInputsMeta, mcls).__new__(mcls, name, bases, attrs)
        for attr, obj in attrs.items():
            if isinstance(obj, VisDependentProperty):
                obj.__set_name__(cls, attr)
        return cls


[docs]class VisDependentProperty(object): """ VisDependentProperty is a Python data descriptor that standardises the behaviour of pipeline Inputs properties and lets them create default values more easily. On reading a VisDependentProperty (ie. using the dot prefix: inputs.solint), one of two things happens: 1. If a NullMarker is found - signifying that no user input has been provided - and a 'getter' function has been defined, the getter function will be called to provide a default value for that measurement set. 2. If a user has overridden the value (eg. inputs.solint = 123), that value will be retrieved. 3. The value, either the default from step 1 or user-provided from step 2, is run through the optional postprocess function, which gives a final opportunity to change the value depending on the state/value of other properties. A VisDependentProperty can be made read-only by specifying 'readonly=True' when creating the instance. A VisDependentProperty can be hidden from the containing Inputs string representation by setting 'hidden=True' when creating the instance. This will hide the property from the web log and CLI getInputs calls. Each VisDependentProperty has a set of values it considers equivalent to null. When the user sets the VDP value to one of these null values, the VDP machinery converts this to a private NullObject marker that signifies the property is now unset, resulting in the default value being returned next time the property is read. Developers can specify which values should be converted to NullObject by specifying null_input at creation time, e.g., solint = @VisDependentProperty(default=5, null_input=[None, '', 'RESET', -1]) """ # TODO check whether this can be replaced with NULL NO_DEFAULT = NoDefaultMarker() @property def backing_store_name(self): """ The name of the attribute holding the value for this property. """ return '_' + self.name
[docs] def convert(self, fconvert): """ Set the function used to clean and/or convert user-supplied argument values before they are associated with the instance property. The provided function should accept one unnamed argument, which when passed will be the user input *for this measurement set*. That is, after potentially being divided up into per-measurement values. """ return type(self)(self.fdefault, fconvert, self.fpostprocess, default=self.default, hidden=self.hidden, null_input=self.null_input)
[docs] def default(self, fdefault): """ Set the function used to get the attribute value when the user has not supplied an override value. """ return type(self)(fdefault, self.fconvert, self.fpostprocess, hidden=self.hidden, null_input=self.null_input)
[docs] def fget(self, owner): """ Gets the underlying property value from an instance of the class owning this property :param owner: :return: """ return getattr(owner, self.backing_store_name, self.null)
[docs] def fset(self, owner, value): """ Sets the property value on the instance owning this property. :param owner: :param value: :return: """ setattr(owner, self.backing_store_name, value)
[docs] def postprocess(self, fpostprocess): """ Set the function used to process the value that is about to be returned. This allows the value to be modified or perhaps a different value based on another property to be returned. :param owner: :return: """ return type(self)(self.fdefault, self.fconvert, fpostprocess, default=self.default, hidden=self.hidden, null_input=self.null_input)
def __init__(self, fdefault=None, fconvert=None, fpostprocess=None, default=NO_DEFAULT, readonly=False, hidden=False, null_input=None): self.fdefault = fdefault self.fconvert = fconvert self.fpostprocess = fpostprocess self.default = default self.readonly = readonly self.hidden = hidden self.null_input = null_input # use shared NullMarker instance where possible. There are lots of # Inputs properties, and only a handful require something custom. if null_input is None: self.null = _NULL else: self.null = NullMarker(null_input=null_input) def __call__(self, fdefault, *args, **kwargs): # __call__ is executed when a function definition is called with # arguments # LOG.info('In __call__ for %s' % fget.func_name) self.fdefault = fdefault return self def __get__(self, instance, owner): # Return the VisDependentProperty itself when called directly if instance is None: return self instance_val = self.fget(instance) if instance_val == self.null: if self.fdefault: instance_val = self.fdefault(instance) elif self.default != VisDependentProperty.NO_DEFAULT: instance_val = self.default else: raise ValueError('Cannot get property with no default and no user value') if self.fpostprocess: return self.fpostprocess(instance, instance_val) else: return instance_val def __set__(self, instance, value): if self.readonly: raise AttributeError('can\'t set read-only attribute: {!s}'.format(self.name)) value = self.null.convert(value) # pass non-null values through the user-provided converter converted = value if self.fconvert is not None and value != self.null: converted = self.fconvert(instance, value) self.fset(instance, converted) def __set_name__(self, owner, name): self.name = name
[docs]class InputsContainer(object): """ InputsContainer is the top-level container object for all task Inputs. InputsContainer contains machinery to let Inputs classes operate purely in the scope of a single measurement set, to make both Inputs and Task implementation much simpler. The InputsContainer operates in the scope of multiple measurement sets, and holds one Inputs instance for every measurement set within the context At task execution time, the task is executed for each active Inputs instance. Not all the Inputs instances held by the InputsContainer need be active: the user can reduce the scope of the task to a subset of measurement sets by setting vis, which makes an Inputs instance hidden and inactive. Tasks that operate in the scope of more than one measurement set, e.g, imaging and session-aware tasks, can disable the InputsContainer machinery by setting is_multi_vis_task to True. For these multivis tasks, one Inputs instance is held in an InputsContainer, but all property sets and gets pass directly through the one underlying inputs instance. For tasks that operate in the scope of a single measurement set, the InputsContainer class works in conjunction with VisDependentProperty to provide and process user input (eg. inputs.solint = 123) according to a set of rules: 1. If the input is scalar and equal to '' or None, all measurement sets will be mapped back to NullMarker, therefore returning the default value or custom getter function on subsequent access. 2. If the input is a list with number of items equal to the number of measurement sets, the items will be divided up and treated as mapping one value per measurement set. 3. Otherwise, the user input is considered as the new default value for all measurement sets. Before the user input is stored in the dictionary, however, the input is passed through the convert function, assuming one has been provided. The convert function allows the developer to validate or convert user input to a standard format before accepting it as a new argument. """ def __init__(self, task_cls, context, *args, **kwargs): self._context = context self._task_cls = task_cls self._initargs = kwargs.copy() # For ModeInputs, the scope attribute is defined by one of the classes # that the ModeInputs switches between, rather than the ModeInputs # constructor itself. if issubclass(task_cls.Inputs, ModeInputs): if 'mode' not in kwargs: # user intends for the class to use the default mode. Inspect # the constructor to find what that value is. constructor = task_cls.Inputs.__init__ code = constructor.__code__ argcount = code.co_argcount argnames = code.co_varnames[:argcount] fn_defaults = constructor.__defaults__ or list() argdefs = dict(zip(argnames[-len(fn_defaults):], fn_defaults)) # user intends for the class to use the default mode. Inspect # the constructor to find what that value is if 'mode' not in argdefs: raise KeyError('The mode attribute must be specified for ModeInputs constructors') mode = argdefs['mode'] else: mode = kwargs['mode'] current_inputs_cls = task_cls.Inputs._modes[mode].Inputs else: current_inputs_cls = task_cls.Inputs # to support ModeInputs we introduce a level of indirection: this # variable points to the Inputs class that code should inspect self._current_inputs_cls = current_inputs_cls # Inspect the targeted Inputs constructor specification to find out # which attribute sets the scope (=the data sets to process) for that # task. # # note: _scope_attr must be set for __setattr__ to function correctly # constructor_spec = inspect.getargspec(current_inputs_cls.__init__) if 'vis' in constructor_spec.args: self._scope_attr = 'vis' elif 'infiles' in constructor_spec.args: self._scope_attr = 'infiles' else: msg = 'No scope argument recognised in {!s} constructor' ''.format(task_cls.Inputs.__name__) raise AttributeError(msg) # all arguments must be of kev/value type for processing named_args = name_all_arguments(current_inputs_cls.__init__, context, *args, **kwargs) # if no datasets are specified, set the dataset scope to all those # registered with the context try: scope_property = getattr(current_inputs_cls, self._scope_attr) scope_null = scope_property.null except: scope_null = _NULL scope_value = named_args.get(self._scope_attr, scope_null) scope_is_null = scope_null == scope_null.convert(scope_value) if scope_is_null: from . import basetask # note that for ModeInputs this queries whether the ModeInputs is # registered for imaging MSes, not the Inputs that is selected. imaging_preferred = issubclass(self._task_cls.Inputs, api.ImagingMeasurementSetsPreferred) ms_pool = self._context.observing_run.get_measurement_sets(imaging_preferred=imaging_preferred) named_args[self._scope_attr] = [ms.name for ms in ms_pool] # multi-vis tasks do not require any further processing from . import sessionutils # CAS-11443. ParallelTaskTemplate is a multivis task, in the sense that # one instance of ParallelTaskTemplate executes for all MSes, but the # Inputs should be treated as non-multivis so that the input arguments # for HPC tasks can be mapped to MSes using the same VDP logic as for # non-HPC tasks. self._multivis = task_cls.is_multi_vis_task and not issubclass(task_cls, sessionutils.ParallelTemplate) if self._multivis: self._cls_instances = {'all': task_cls.Inputs(**named_args)} self._active_instances = list(self._cls_instances.values()) return # assign constructor args to data sets constructor_args = self._remap_constructor_args(**named_args) try: # create dict of <data name>: Inputs instance self._cls_instances = collections.OrderedDict( (os.path.basename(kw[self._scope_attr]), task_cls.Inputs(**kw)) for kw in constructor_args) except TypeError: # catch TypeError exceptions from unexpected keyword arguments # so that we can add some more context to the debug message LOG.error('Error creating {!s}'.format(task_cls.Inputs.__name__)) raise self._active_instances = list(self._cls_instances.values()) def _remap_constructor_args(self, **kwargs): # find out how many datasets are in this call. This number is used to # multiply scalar values appropriately scope_val = kwargs[self._scope_attr] if isinstance(scope_val, str): scope_val = scope_val.split(',') if not isinstance(scope_val, list): msg = 'Illegal format for {!s}: {!r}'.format(self._scope_attr, scope_val) raise AttributeError(msg) num_datasets = len(scope_val) # Process each argument value. In the resulting dict, each key is an # argument name with values containing one value per dataset processed = {k: self._process_arg_val(num_datasets, k, v) for k, v in kwargs.items()} # Split the dict so that we have n dicts for n datasets, with each # dict containing just the values for that data. return [{k: v[i] for k, v in processed.items()} for i in range(num_datasets)] def _process_arg_val(self, num_datasets, name, val): if name == self._scope_attr: if isinstance(val, str): return val.split(',') else: return val # n values, n vis => distribute values if isinstance(val, (list, tuple)) and len(val) == num_datasets: return val # n values, m vis => assign value to all return [val for _ in range(num_datasets)] @property def _pipeline_casa_task(self): # Map the Inputs class to the hif* equivalent. Note that registry maps # maps Task classes, not Inputs classes, to their CASA task equivalent. # However, Task.Inputs *does* point to an Inputs class so we can # compare self against that. casa_tasks = [m.casa_task for m in task_registry.task_map if m.pipeline_class.Inputs == self._task_cls.Inputs] if len(casa_tasks) is not 1: return # map Python Inputs arguments back to their CASA equivalent remapped = argmapper.inputs_to_casa(self._task_cls, self._initargs) # CAS-6299. Extra request from Liz: # # "the full directory path of the ASDM location is given from the Pipeline # observatory run, so a PI/DRMs would have to edit this. Could it be # replaced just by the name of the ASDM/ASDMs?" # # this means we have to take the basename of the vis argument for the # importdata calls if '_importdata' in casa_tasks[0]: key = self._scope_attr remove_slash = lambda x: x.rstrip('/') if isinstance(remapped[key], str): remapped[key] = os.path.basename(remove_slash(remapped[key])) else: remapped[key] = [os.path.basename(remove_slash(v)) for v in remapped[key]] task_args = ['%s=%r' % (k, v) for k, v in remapped.items() if k not in ['self', 'context'] and v is not None] # work around CASA problem with globals when no arguments are specified if not task_args: task_args = ['pipelinemode="automatic"'] casa_call = '%s(%s)' % (casa_tasks[0], ', '.join(task_args)) return casa_call def __len__(self): return len(self._active_instances) def __iter__(self): return iter(self._active_instances) def __getitem__(self, index): cls = type(self) if isinstance(index, numbers.Integral): return self._active_instances[index] elif isinstance(index, str): return self._cls_instances[index] else: msg = '{cls.__name__} indices must either be integers or the name of a measurement set' raise TypeError(msg.format(cls=cls)) def __members__(self): raise NotImplemented def __methods__(self): raise NotImplemented def __getattr__(self, name): # __getattr__ is only called when this object's __dict__ does not # contain the requested property. When this happens, we delegate to # the currently selected Inputs. First, however, we check whether # the requested property is one of the Python magic methods. If so, # we return the standard implementation. This is necessary for # pickle, which checks for __getnewargs__ and __getstate__. if name.startswith('__') and name.endswith('__'): # LOG.error('Implement {!s} in InputsContainer'.format(name)) return super(InputsContainer, self).__getattr__(name) if name in dir(self): return super(InputsContainer, self).__getattr__(name) if name == self._scope_attr: LOG.trace('Retrieving scope from {!s}.{!s}'.format(self._task_cls.Inputs.__name__, name)) return self._get_scope() LOG.trace('InputsContainer.{!s}: delegating to {!s}'.format(name, self._task_cls.Inputs.__name__)) result = [getattr(i, name) for i in self._active_instances] return format_value_list(result) def __setattr__(self, name, val): # If the property we're trying to set is one of this base class's # private variables, add it to our __dict__ using the standard # __setattr__ method if name in ('_context', '_task_cls', '_cls_instances', '_active_instances', '_scope_attr', '_initargs', '_multivis', '_current_inputs_cls'): if LOG.isEnabledFor(logging.TRACE): LOG.trace('Setting {!s}.{!s} = {!r}'.format(self.__class__.__name__, name, val)) return super(InputsContainer, self).__setattr__(name, val) # check whether this class has a getter/setter by this name. If so, # allow the write to __dict__ if name == self._scope_attr: if LOG.isEnabledFor(logging.TRACE): LOG.trace('Setting scope: {!s}.{!s} = {!r}'.format(self.__class__.__name__, name, val)) return self._set_scope(val) for inputs, user_arg in self._map_args_to_vis(val): if LOG.isEnabledFor(logging.TRACE): LOG.trace('Setting {!s}.{!s} = {!r}'.format(inputs.__class__.__name__, name, user_arg)) setattr(inputs, name, user_arg) if name == 'mode' and issubclass(self._task_cls.Inputs, ModeInputs): # can only have one active class at a time, so it's safe to take # the first instance new_cls = self._active_instances[0]._active.__class__ LOG.trace('Setting VDP target inputs class to {!s}'.format(new_cls.__name__)) self._current_inputs_cls = new_cls def _map_args_to_vis(self, val): """ distribute a incoming user argument amongst the active instances. :param val: :return: """ if not isinstance(val, (list, tuple)): val = [val] * len(self._active_instances) return list(zip(self._active_instances, val)) def _get_scope(self): scope = [getattr(i, self._scope_attr) for i in self._active_instances] return format_value_list(scope) def _set_scope(self, scope): # for multivis tasks, all we should do is set vis on the contained # inputs. As there only ever one active inputs instance, no further # processing is required so we can exit. if self._multivis: setattr(self._active_instances[0], self._scope_attr, scope) return # reset to all MSes if the input arg signals a reset, which expands # task scope to all MSes if _NULL.convert(scope) == _NULL: scope = list(self._cls_instances.keys()) # the key for inputs instances is the basename vis if isinstance(scope, str): scope = [scope] basenames = [os.path.basename(v) for v in scope] # # TODO: the code below should be removed after constructor refactoring # # create a new inputs instance if one cannot be found for basename, path in zip(basenames, scope): if basename in self._cls_instances: continue kwargs = {self._scope_attr: path} self._cls_instances[basename] = self._task_cls.Inputs(self._context, **kwargs) # set the task scope to the vises set here self._active_instances = [self._cls_instances[n] for n in basenames] def __repr__(self): return '<InputsContainer({!s}, {!r}>'.format(self._task_cls, self._context.name) def __str__(self): if not self._active_instances: return 'Empty' attrs = {name for name in dir(self._current_inputs_cls) if not name.startswith('_')} # We should also print the mode for ModeInputs if issubclass(self._task_cls.Inputs, ModeInputs): attrs.add('mode') methods = {fn_name for fn_name, _ in inspect.getmembers(self._current_inputs_cls, inspect.ismethod)} props = attrs - methods props.discard('context') props.discard('ms') vals = {k: getattr(self, k) for k in props} return pprint.pformat(vals)
[docs] def as_dict(self): properties = {} input_dicts = [i.as_dict() for i in self._active_instances] all_keys = {key for d in input_dicts for key in d} for key in all_keys: vals = [d.get(key, []) for d in input_dicts] properties[key] = format_value_list(vals) return properties
def get_properties(inputs_cls): attrs = {name for name in dir(inputs_cls) if not name.startswith('_')} methods = {fn_name for fn_name, _ in inspect.getmembers(inputs_cls, inspect.ismethod)} props = attrs - methods props.discard('context') props.discard('ms') return props
[docs]class StandardInputs(api.Inputs, metaclass=PipelineInputsMeta): # - standard non-vis-dependent properties -------------------------------- @property def context(self): """ Get the context used as the base for the parameters for this Inputs. :rtype: :class:`~pipeline.infrastructure.launcher.Context` """ # A product of the prepare/analyse refactoring is that we always need # a context. This message is to ensure it was set by the implementing # subclass. We could remove this once refactoring is complete, but it # should be a cheap comparison so we leave it in. # # Note that the Context is NOT vis-dependent. The same context is used # to source all MeasurementSets. if not isinstance(self._context, launcher.Context): msg = '{!s} did not set the pipeline context'.format(self.__class__.__name__) raise TypeError(msg) return self._context @context.setter def context(self, value): if not isinstance(value, launcher.Context): msg = 'context not a pipeline context: {!s}'.format(value.__class__.__name__) raise TypeError(msg) self._context = value # - vis-dependent properties --------------------------------------------- vis = VisDependentProperty(default='', null_input=['', None, [], ['']])
[docs] @VisDependentProperty(readonly=True, hidden=True) def ms(self): """ Return the MeasurementSet for the current value of vis. :rtype: :class:`~pipeline.domain.MeasurementSet` """ if isinstance(self.vis, list): return [self.context.observing_run.get_ms(vis) for vis in self.vis] return self.context.observing_run.get_ms(self.vis)
[docs] @VisDependentProperty def output_dir(self): return self.context.output_dir
def _get_task_args(self, ignore=None): """ Express this class as a dictionary of CASA arguments, listing all inputs except those named in ignore. The main purpose of the ignore argument is used to prevent an infinite loop in :meth:`~CommonCalibrationInputs.caltable`, which determines the value of caltable based on the value of the other CASA task arguments. """ if ignore is None: ignore = [] # get the signature of this Inputs class. We want to return a # of dictionary of all kw argument names except self, the # pipeline-specific arguments (context, output_dir, run_qa2 etc.) and # caltable. skip = ['self', 'context', 'output_dir', 'ms', 'calstate'] skip.extend(ignore) kw_names = [a for a in inspect.getargspec(self.__init__).args if a not in skip] d = {key: getattr(self, key) for key in kw_names} # add any read-only properties too for k, v in inspect.getmembers(self.__class__, inspect.isdatadescriptor): if k in d or k.startswith('_') or k in skip: continue try: d[k] = getattr(self, k) except AttributeError: LOG.debug('Could not get input property %s' % k) return d
[docs] def to_casa_args(self): """ Express these inputs as a dictionary of CASA arguments. The values in the dictionary are in a format suitable for CASA and can be directly passed to the CASA task. :rtype: a dictionary of string/??? kw/val pairs """ args = self._get_task_args() # spw needs to be a string and not a number if 'spw' in args: args['spw'] = str(args['spw']) # Handle VLA-specific arguments and peculiarities ms = self.ms if ms.antenna_array.name == 'VLA': # CASA believes that VLA data are not labelled with calibration # intent, so must remove the intent from the task call args['intent'] = None if args.get('intent', None) is not None: args['intent'] = utils.to_CASA_intent(ms, args['intent']) for k, v in list(args.items()): if v is None: del args[k] return args
def __str__(self): return self.as_dict() def __repr__(self): return pprint.pformat(self.as_dict())
[docs] def as_dict(self): return utils.collect_properties(self)
[docs]class ModeInputs(api.Inputs, metaclass=PipelineInputsMeta): """ ModeInputs is a facade for Inputs of a common task type, allowing the user to switch between task implementations by changing the 'mode' parameter. Extending classes should override the _modes dictionary with a set of key/value pairs, each pair mapping the mode name key to the task class value. """ _modes = {} def __init__(self, context, mode=None, **parameters): # create a dictionary of Inputs objects, one of each type self._delegates = {k: task_cls.Inputs(context, vis=parameters['vis']) for k, task_cls in self._modes.items()} # set the mode to the requested mode, thus setting the active Inputs self.mode = mode # set any parameters provided by the user for k, v in parameters.items(): setattr(self, k, v) def __getattr__(self, name): # __getattr__ is only called when this object's __dict__ does not # contain the requested property. When this happens, we delegate to # the currently selected Inputs. First, however, we check whether # the requested property is one of the Python magic methods. If so, # we return the standard implementation. This is necessary for # pickle, which checks for __getnewargs__ and __getstate__. if name.startswith('__') and name.endswith('__'): return super(ModeInputs, self).__getattr__(name) if name in dir(self): return super(ModeInputs, self).__getattr__(name) LOG.trace('ModeInputs.{!s}: delegating to {!s}'.format(name, self._active.__class__.__name__)) return getattr(self._active, name) def __setattr__(self, name, val): # If the property we're trying to set is one of this base class's # private variables, add it to our __dict__ using the standard # __setattr__ method if name in ('_active', '_delegates', '_mode', '_pipeline_casa_task'): LOG.trace('Setting {!s}.{!s} = {!r}'.format(self.__class__.__name__, name, val)) return super(ModeInputs, self).__setattr__(name, val) # # check whether this class has a getter/setter by this name. If so, # # allow the write to __dict__ # for (fn_name, _) in inspect.getmembers(self.__class__, # inspect.isdatadescriptor): # # our convention is to prefix the data variable for a # # getter/setter with an underscore. # if name in (fn_name, '_' + fn_name): # LOG.trace('Getter/setter found on {0}. Setting \'{1}\' ' # 'attribute to \'{2}\''.format(self.__class__.__name__, # name, val)) # super(ModeInputs, self).__setattr__(name, val) # # # overriding defaults of wrapped classes requires us to re-get # # the value after setting it, as the property setter of this # # superclass has probably transformed it, eg. None => 'inf'. # # Furthermore, we do not return early, giving this function a # # chance to set the parameter - with this new value - on the # # wrapped classes too. # val = getattr(self, name) # check whether this class has a getter/setter by this name. If so, # allow the write to __dict__ for fn_name in dir(self): # our convention is to prefix the data variable for a # getter/setter with an underscore. if name in (fn_name, '_' + fn_name): # if name == fn_name: LOG.trace('Getter/setter found: setting {!s}.{!s} = {!r}'.format(self.__class__.__name__, name, val)) super(ModeInputs, self).__setattr__(name, val) # overriding defaults of wrapped classes requires us to re-get # the value after setting it, as the property setter of this # superclass has probably transformed it, eg. None => 'inf'. # Furthermore, we do not return early, giving this function a # chance to set the parameter - with this new value - on the # wrapped classes too. val = getattr(self, name) # otherwise, set the said attribute on all of our delegate Inputs. In # doing so, the user can switch mode at any time and have the new # Inputs present with all their previously set parameters. for d in self._delegates.values(): if hasattr(d, name): LOG.trace('Setting {!s}.{!s} = {!r}'.format(self.__class__.__name__, name, val)) setattr(d, name, val) @property def mode(self): return self._mode @mode.setter def mode(self, value): if value not in self._modes: keys = list(self._modes.keys()) msg = 'Mode must be one of \'{0}\' but got \'{1}\''.format( '\', \''.join(keys[:-1]) + '\' or \'' + keys[-1], value) LOG.error(msg) raise ValueError(msg) self._active = self._delegates[value] self._mode = value
[docs] def get_task(self): """ Get the task appropriate to the current Inputs. """ task_cls = self._modes[self._mode] return task_cls(self._active)
[docs] def as_dict(self): props = utils.collect_properties(self._active) props.update(utils.collect_properties(self)) return props
[docs] def to_casa_args(self): return self._active.to_casa_args()
def __repr__(self): return pprint.pformat(self.as_dict())
[docs] @classmethod def get_constructor_args(cls, ignore=('self', 'context')): """ Get the union of all arguments accepted by this class's constructor. """ all_args = set() # get the arguments for this class's contructor args = inspect.getargspec(cls.__init__).args # and add them to our collection all_args.update(args) # now do the same for each inputs class we can switch between for task_cls in cls._modes.values(): # get the arguments of the task Inputs constructor args = inspect.getargspec(task_cls.Inputs.__init__).args # and add them to our set all_args.update(args) if ignore is not None: for i in ignore: all_args.discard(i) return all_args
# module utility functions --------------------------------------------------- def all_unique(o): """ Return True if all elements in the iterable argument are unique. """ # LOG.trace('all_unique(%s)' % str(o)) if not isinstance(o, collections.Iterable): raise ValueError('Cannot determine uniqueness of non-iterables') hashes = {gen_hash(e) for e in o} return len(hashes) > 1 def gen_hash(o): """ Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries). """ # LOG.trace('gen_hash(%s)' % str(o)) if isinstance(o, (set, tuple, list)): return tuple([gen_hash(e) for e in o]) elif not isinstance(o, dict): # LOG.trace('Hash: %s=%s' % (o, hash(o))) return hash(o) new_o = copy.deepcopy(o) for k, v in new_o.items(): new_o[k] = gen_hash(v) return hash(tuple(frozenset(new_o.items()))) def format_value_list(val): # return single values where possible, which is when only one value # is present because the inputs covers one ms or because the values # for each ms are all the same. if len(val) is 0: return val elif len(val) is 1: return val[0] else: if all_unique(val): return val else: return val[0] def name_all_arguments(cls, *args, **kwargs): # we need to know the names of any non-key/value arguments supplied # in the constructor. The names of the argument can be found in the # constructor specification. argspec = inspect.getargspec(cls) # The constructor specification includes 'self', which is not passed # in either args or kwargs. Hence, a dummy value for self is supplied # as the first argument so that the argument indices do not need to be # shifted. if argspec.args[0] == 'self': args = ('dummy self',) + args # get args passed positionally (=args with no default) num_positional_args = len(argspec.args) - len(argspec.defaults) positional_args = {k: v for k, v in zip(argspec.args, args[:num_positional_args])} # get kw args passed by position (=args with a default, but key not # specified in call) positional_kwargs = {k: v for k, v in zip(argspec.args[num_positional_args:], args[num_positional_args:])} named_args = {} named_args.update(positional_args) named_args.update(positional_kwargs) named_args.update(kwargs) # having inserted self, we now remove it as the instance value is # supplied automatically by Python during instance creation if 'self' in named_args: del named_args['self'] return named_args