Source code for pipeline.infrastructure.launcher

"""
The launcher module contains classes to initialize the pipeline, potentially
from a saved context state.
"""
import datetime
import os
import pickle
import pprint

from pipeline import environment
from . import callibrary
from . import casa_tools
from . import imagelibrary
from . import logging
from . import project
from . import utils

LOG = logging.get_logger(__name__)


# minimum allowed CASA revision. Set to 0 or None to disable
MIN_CASA_REVISION = [5, 9, 9, 919]
# maximum allowed CASA revision. Set to 0 or None to disable
MAX_CASA_REVISION = None


[docs]class Context(object): """ Context holds all pipeline state, consisting of metadata describing the data set, objects describing the pipeline calibration state, the tree of Results objects summarising the results of each pipeline task, and a small number of internal pipeline variables and objects. The aim of the Context class is to provide one central object to which all pipeline state is attached. Keeping all state in one object makes it easy to persist this one object, and thus all state, to disk as a Python pickle, allowing pipeline sessions to be interrupted and resumed. ... py:attribute:: project_summary project summary information. ... py:attribute:: project_structure ALMA project structure information. .. py:attribute:: observing_run the top-level (:class:`~pipeline.domain.observingrun.ObservingRun`) through which all other pipeline.domain objects can be accessed .. py:attribute:: callibrary the (:class:`~pipeline.infrastructure.callibrary.CalLibrary`) object holding the calibration state for registered measurement sets .. py:attribute:: calimlist the (:class:`~pipeline.infrastructure.imagelibrary.ImageLibrary`) object holding final images of calibrators .. py:attribute:: sciimlist the (:class:`~pipeline.infrastructure.imagelibrary.ImageLibrary`) object holding final images of science targets .. py:attribute:: results the list of (:class:`~pipeline.infrastructure.api.Result`) objects holding summaries of the pipeline run, one (:class:`~pipeline.infrastructure.api.Result`) for each task. .. py:attribute:: output_dir the directory to which pipeline data should be sent .. py:attribute:: raw_dir the directory holding the raw ASDMs or measurement sets (not used) .. py:attribute:: report_dir the directory where pipeline HTML reports should be sent .. py:attribute:: task_counter integer holding the index of the last task to complete .. py:attribute:: subtask_counter integer holding the index of the last subtask to complete .. py:attribute:: name name of the context; also forms the root of the filename used for the pickled state .. py:attribute:: imaging_mode imaging mode string; may be used to switch between imaging parameter heuristics; currently only used for deciding what products to export """ def __init__(self, output_dir=None, name=None): # initialise the context name with something reasonable: a current # timestamp now = datetime.datetime.utcnow() self.name = name if name else now.strftime('pipeline-%Y%m%dT%H%M%S') # domain depends on infrastructure.casa_tools, so infrastructure cannot # depend on domain hence the run-time import import pipeline.domain as domain self.observing_run = domain.ObservingRun() self.callibrary = callibrary.CalLibrary(self) self.calimlist = imagelibrary.ImageLibrary() self.sciimlist = imagelibrary.ImageLibrary() self.rmsimlist = imagelibrary.ImageLibrary() self.subimlist = imagelibrary.ImageLibrary() self.project_summary = project.ProjectSummary() self.project_structure = project.ProjectStructure() self.project_performance_parameters = project.PerformanceParameters() self.output_dir = output_dir self.products_dir = None self.task_counter = 0 self.subtask_counter = 0 self.results = [] self.logs = {} self.contfile = None self.linesfile = None self.size_mitigation_parameters = {} self.imaging_parameters = {} self.clean_list_pending = [] self.clean_list_info = {} self.sensitivities = [] self.per_spw_cont_sensitivities_all_chan = {'robust': None, 'uvtaper': None} self.synthesized_beams = {'robust': None, 'uvtaper': None} self.imaging_mode = None LOG.trace('Creating report directory \'%s\'' % self.report_dir) utils.mkdir_p(self.report_dir) LOG.trace('Setting products directory to \'%s\'' % self.products_dir) LOG.trace('Pipeline stage counter set to {0}'.format(self.stage)) LOG.todo('Add OUS registration task. Hard-coding log type to MOUS') self.logtype = 'MOUS' self.logs['casa_commands'] = 'casa_commands.log' self.logs['pipeline_script'] = 'casa_pipescript.py' self.logs['pipeline_restore_script'] = 'casa_piperestorescript.py' @property def stage(self): return '%s_%s' % (self.task_counter, self.subtask_counter) @property def report_dir(self): return os.path.join(self.output_dir, self.name, 'html') @property def output_dir(self): return self._output_dir @output_dir.setter def output_dir(self, value): if value is None: value = './' value = os.path.abspath(value) LOG.trace('Setting output_dir to \'%s\'' % value) self._output_dir = value @property def products_dir(self): return self._products_dir @products_dir.setter def products_dir(self, value): if value is None: (root_dir, _) = os.path.split(self.output_dir) value = os.path.join(root_dir, 'products') value = os.path.abspath(value) LOG.trace('Setting products_dir to \'%s\'' % value) self._products_dir = value
[docs] def save(self, filename=None): if filename in ('', None): filename = '%s.context' % self.name with open(filename, 'wb') as context_file: LOG.info('Saving context to \'{0}\''.format(filename)) pickle.dump(self, context_file, protocol=-1)
def __str__(self): ms_names = [ms.name for ms in self.observing_run.measurement_sets] return ('Context(name=\'{0}\', output_dir=\'{1}\')\n' 'Registered measurement sets:\n{2}' ''.format(self.name, self.output_dir, pprint.pformat(ms_names))) def __repr__(self): return '<Context(name={!r})>'.format(self.name)
[docs] def set_state(self, cls, name, value): """ Set a context property using the class name, property name and property value. The class name should be one of: 1. 'ProjectSummary' 2. 'ProjectStructure' 3. 'PerformanceParameters' Background: see CAS-9497 - add infrastructure to translate values from intent.xml to setter functions in casa_pipescript. :param cls: class identifier :param name: property to set :param value: value to set :return: """ m = { 'ProjectSummary': self.project_summary, 'ProjectStructure': self.project_structure, 'PerformanceParameters': self.project_performance_parameters } instance = m[cls] setattr(instance, name, value)
[docs]class Pipeline(object): """ Pipeline is the entry point for initialising the pipeline. It is responsible for the creation of new ~Context objects and for loading saved Contexts from disk. TODO replace this class with a static factory method on Context? """ def __init__(self, context=None, output_dir='./', loglevel='info', casa_version_check=True, name=None, plotlevel='default', path_overrides={}): """ Initialise the pipeline, creating a new ~Context or loading a saved ~Context from disk. :param context: filename of the pickled Context to load from disk. Specifying 'last' loads the last-saved Context, while passing None creates a new Context. :type context: string :param output_dir: root directory to which all output will be written :type output_dir: string :param loglevel: pipeline log level :type loglevel: string :param casa_version_check: enable (True) or bypass (False) the CASA version check. Default is True. :type ignore_casa_version: boolean """ # configure logging with the preferred log level logging.set_logging_level(level=loglevel) # Prevent users from running the pipeline on old or incompatible # versions of CASA by comparing the CASA subversion revision against # our expected minimum and maximum if casa_version_check is True: if MIN_CASA_REVISION and environment.compare_casa_version('<', MIN_CASA_REVISION): msg = ('Minimum CASA revision for the pipeline is %s, ' 'got CASA %s.' % (MIN_CASA_REVISION, environment.casa_version)) LOG.critical(msg) if MAX_CASA_REVISION and environment.compare_casa_version('>', MAX_CASA_REVISION): msg = ('Maximum CASA revision for the pipeline is %s, ' 'got CASA %s.' % (MAX_CASA_REVISION, environment.casa_version)) LOG.critical(msg) # if no previous context was specified, create a new context for the # given measurement set if context is None: self.context = Context(output_dir=output_dir, name=name) # otherwise load the context from disk.. else: # .. by finding either last session, or.. if context == 'last': context = self._find_most_recent_session() # .. the user-specified file with open(context, 'rb') as context_file: LOG.info('Reading context from file {0}'.format(context)) last_context = utils.pickle_load(context_file) self.context = last_context for k, v in path_overrides.items(): setattr(self.context, k, v) self._link_casa_log(self.context) # define the plot level as a global setting rather than on the # context, as we want it to be a session-wide setting and adjustable # mid-session for interactive use. import pipeline.infrastructure as infrastructure infrastructure.set_plot_level(plotlevel) def _link_casa_log(self, context): report_dir = context.report_dir # create a hard-link to the current CASA log in the report directory src = casa_tools.log.logfile() dst = os.path.join(report_dir, os.path.basename(src)) if not os.path.exists(dst): try: os.link(src, dst) except OSError: LOG.error('Error creating hard link to CASA log') LOG.warning('Reverting to symbolic link to CASA log. This is unsupported!') try: os.symlink(src, dst) except OSError: LOG.error('Well, no CASA log for you') # the web log creates links to each casa log. The name of each CASA # log is appended to the context. if 'casalogs' not in context.logs: # list as one casa log will be created per CASA session context.logs['casalogs'] = [] if src not in context.logs['casalogs']: context.logs['casalogs'].append(os.path.basename(dst)) def _find_most_recent_session(self, directory='./'): # list all the files in the directory.. files = [f for f in os.listdir(directory) if f.endswith('.context')] # .. and from these matches, create a dict mapping files to their # modification timestamps, .. name_n_timestamp = dict([(f, os.stat(directory+f).st_mtime) for f in files]) # .. then return the file with the most recent timestamp return max(name_n_timestamp, key=name_n_timestamp.get) def __repr__(self): ms_names = [ms.name for ms in self.context.observing_run.measurement_sets] return 'Pipeline({0})'.format(ms_names)
[docs] def close(self): filename = self.context.name with open(filename, 'r+b') as session: pickle.dump(self.context, session, protocol=-1)