import abc
import collections
import datetime
import inspect
import os
import pickle
import re
import textwrap
import traceback
import uuid
from .mpihelpers import MPIEnvironment
from . import api
from . import casa_tools
from . import filenamer
from . import jobrequest
from . import logging
from . import pipelineqa
from . import project
from . import task_registry
from . import utils
from . import vdp
LOG = logging.get_logger(__name__)
# control generation of the weblog
DISABLE_WEBLOG = False
VISLIST_RESET_KEY = '_do_not_reset_vislist'
[docs]def timestamp(method):
def attach_timestamp_to_results(self, *args, **kw):
start = datetime.datetime.utcnow()
result = method(self, *args, **kw)
end = datetime.datetime.utcnow()
if result is not None:
result.timestamps = Timestamps(start, end)
return result
return attach_timestamp_to_results
[docs]def result_finaliser(method):
"""
Copy some useful properties to the Results object before returning it.
This is used in conjunction with execute(), where the Result could be
returned from a number of places but we don't want to set the properties
in each location.
TODO: refactor so this is done as part of execute!
"""
def finalise_pipeline_result(self, *args, **kw):
result = method(self, *args, **kw)
if isinstance(result, ResultsList) and len(result) == 0:
return result
elif result is not None:
inputs = self.inputs
result.inputs = inputs.as_dict()
result.stage_number = inputs.context.task_counter
try:
result.pipeline_casa_task = inputs._pipeline_casa_task
except AttributeError:
# sub-tasks may not have pipeline_casa_task, but we only need
# it for the top-level task
pass
return result
return finalise_pipeline_result
[docs]def capture_log(method):
def capture(self, *args, **kw):
# get the size of the CASA log before task execution
logfile = casa_tools.log.logfile()
size_before = os.path.getsize(logfile)
# execute the task
result = method(self, *args, **kw)
# copy the CASA log entries written since task execution to the result
with open(logfile, 'r') as casalog:
casalog.seek(size_before)
# sometimes we can't write properties, such as for flagdeteralma
# when the result is a dict
try:
result.casalog = casalog.read()
except:
LOG.trace('Could not set casalog property on result of type '
'%s' % result.__class__)
# To save space in the pickle, delete any inner CASA logs. The web
# log will only write the outer CASA log to disk
if isinstance(result, collections.Iterable):
for r in result:
if hasattr(r, 'casalog'):
del r.casalog
return result
return capture
[docs]class ModeTask(api.Task):
# override this if your inputs needs visibility of all measurement sets in
# scope
is_multi_vis_task = False
def __init__(self, inputs):
super(ModeTask, self).__init__()
# complain if we were given the wrong type of inputs
if not isinstance(inputs, self.Inputs):
msg = '{0} requires inputs of type {1} but got {2}.'.format(
self.__class__.__name__,
self.Inputs.__name__,
inputs.__class__.__name__)
raise TypeError(msg)
self.inputs = inputs
self._delegate = inputs.get_task()
[docs] def execute(self, dry_run=True, **parameters):
self._check_delegate()
return self._delegate.execute(dry_run, **parameters)
def __getattr__(self, name):
self._check_delegate()
return getattr(self._delegate, name)
def _check_delegate(self):
"""
Update, if necessary, the delegate task so that it matches the
mode specified in the Inputs.
This function is necessary as the value of Inputs.mode can change
after the task has been constructed. Therefore, we cannot rely on any
delegate set at construction time. Instead, the delegate must be
updated on every execution.
"""
# given two modes, A and B, it's possible that A is a subclass of B,
# eg. PhcorChannelBandpass extends ChannelBandpass, therefore we
# cannot test whether the current instance is the correct type using
# isinstance. Instead, we need to compare class names.
mode = self.inputs.mode
mode_cls = self.inputs._modes[mode]
mode_cls_name = mode_cls.__name__
delegate_cls_name = self._delegate.__class__.__name__
if mode_cls_name != delegate_cls_name:
self._delegate = self.inputs.get_task()
# A simple named tuple to hold the start and end timestamps
Timestamps = collections.namedtuple('Timestamps', ['start', 'end'])
[docs]class Results(api.Results):
"""
Results is the base implementation of the Results API.
In practice, all results objects should subclass this object to take
advantage of the shared functionality.
"""
def __init__(self):
super(Results, self).__init__()
# set the value used to uniquely identify this object. This value will
# be used to determine whether this results has already been merged
# with the context
self._uuid = uuid.uuid4()
# property used to hold pipeline QA values
self.qa = pipelineqa.QAScorePool()
# property used to hold metadata and presentation-focused values
# destined for the weblog. Currently a dict, but could change.
self._metadata = {}
@property
def uuid(self):
"""
The unique identifier for this results object.
"""
return self._uuid
@property
def metadata(self):
"""
Object holding presentation-related values destined for the web log
"""
return self._metadata
[docs] def merge_with_context(self, context):
"""
Merge these results with the given context.
This method will be called during the execution of accept(). For
calibration tasks, a typical implementation will register caltables
with the pipeline callibrary.
At this point the result is deemed safe to merge, so no further checks
on the context need be performed.
:param context: the target
:class:`~pipeline.infrastructure.launcher.Context`
:type context: :class:`~pipeline.infrastructure.launcher.Context`
"""
LOG.debug('Null implementation of merge_with_context used for %s'
'' % self.__class__.__name__)
[docs] def accept(self, context=None):
"""
Accept these results, registering objects with the context and incrementing
stage counters as necessary in preparation for the next task.
"""
if context is None:
# context will be none when called from a CASA interactive
# session. When this happens, we need to locate the global context
# from the stack
import pipeline.h.cli.utils
context = pipeline.h.cli.utils.get_context()
# find whether this result is being accepted as part of a task
# execution or whether it's being accepted after task completion
task_completed = utils.task_depth() == 0
# Check to ensure this exact result was not already merged into the
# context.
self._check_for_remerge(context)
# If all goes well, this result is the one that will be appended to
# the results list of the context.
result_to_append = self
# PIPE-16: handle exceptions that may occur during the acceptance of
# the result into the context:
# Try to execute the task-specific (non-pipeline-framework) parts of
# results acceptance, which are the merging of the result into the
# context, and the task QA calculation (for top-level tasks).
try:
# execute our template function
self.merge_with_context(context)
# perform QA if accepting this result from a top-level task
if task_completed:
pipelineqa.qa_registry.do_qa(context, self)
# If an exception happened during the acceptance of a top-level
# pipeline task, then create a new FailedTaskResults that contains
# the exception traceback, and mark this new result as the one
# to append to the context results list.
except Exception as ex:
if task_completed:
# Log error message.
tb = traceback.format_exc()
LOG.error('Error while accepting pipeline task result into context.')
LOG.error(tb)
# Create a special result object representing the failed task.
failedresult = FailedTaskResults(self[0].task, ex, tb)
# Copy over necessary properties from real result.
failedresult.stage_number = self.stage_number
failedresult.inputs = self.inputs
failedresult.timestamps = self.timestamps
# Override the result that is to be appended to the context.
result_to_append = failedresult
# Re-raise the exception to allow executioner of pipeline tasks to
# decide how to proceed.
raise
# Whether the result acceptance went ok, or an exception occurred
# and a new FailedTaskResults was created, always carry out the
# following steps on the result_to_append, to ensure that e.g.
# the result is pickled to disk and the weblog is rendered.
finally:
if task_completed:
# If accept() is called at the end of a task, create a proxy for
# this result and pickle it to the appropriate weblog stage
# directory. This keeps the context size at a minimum.
proxy = ResultsProxy(context)
proxy.write(result_to_append)
result = proxy
else:
result = result_to_append
# Add the results object to the results list.
context.results.append(result)
# having called the old constructor, we know that self.context is set.
# Use this context to find the report directory and write to the log
if task_completed:
# this needs to come before web log generation as the filesizes of
# various logs and scripts are calculated during web log generation
write_pipeline_casa_tasks(context)
# generate weblog if accepting a result from outside a task execution
if task_completed and not DISABLE_WEBLOG:
# cannot import at initial import time due to cyclic dependency
import pipeline.infrastructure.renderer.htmlrenderer as htmlrenderer
htmlrenderer.WebLogGenerator.render(context)
# If running at DEBUG loglevel and this is a top-level task result,
# then store to disk a pickle of the context as it existed at the
# end of this pipeline stage; this may be useful for debugging.
if task_completed and LOG.isEnabledFor(logging.DEBUG):
basename = 'context-stage%s.pickle' % result_to_append.stage_number
path = os.path.join(context.output_dir,
context.name,
'saved_state',
basename)
utils.mkdir_p(os.path.dirname(path))
with open(path, 'wb') as outfile:
pickle.dump(context, outfile, -1)
def _check_for_remerge(self, context):
"""
Check whether this result has already been added to the given context.
"""
# context.results contains the list of results that have been merged
# with the context. Check whether the UUID of any result or sub-result
# in that list matches the UUID of this result.
for result in context.results:
if self._is_uuid_in_result(result):
msg = 'This result has already been added to the context'
LOG.error(msg)
raise ValueError(msg)
def _is_uuid_in_result(self, result):
"""
Return True if the UUID of this result matches the UUID of the given
result or any sub-result contained within.
"""
for subtask_result in getattr(result, 'subtask_results', ()):
if self._is_uuid_in_result(subtask_result):
return True
if result.uuid == self.uuid:
return True
return False
[docs]class FailedTaskResults(Results):
"""
FailedTaskResults represents a results object for a task that encountered
an exception during execution.
"""
def __init__(self, origtask_cls, exception, tb):
super(FailedTaskResults, self).__init__()
self.exception = exception
self.origtask_cls = origtask_cls
self.task = FailedTask
self.tb = tb
def __repr__(self):
s = "FailedTaskResults:\n"\
"\toriginal task: {}\n".format(self.origtask_cls.__name__)
return s
[docs]class ResultsProxy(object):
def __init__(self, context):
self._context = context
[docs] def write(self, result):
"""
Write the pickled result to disk.
"""
# adopt the result's UUID protecting against repeated addition to the
# context
self.uuid = result.uuid
self._write_stage_logs(result)
# only store the basename to allow for relocation between save and
# restore
self._basename = 'result-stage%s.pickle' % result.stage_number
path = os.path.join(self._context.output_dir,
self._context.name,
'saved_state',
self._basename)
utils.mkdir_p(os.path.dirname(path))
with open(path, 'wb') as outfile:
pickle.dump(result, outfile, pickle.HIGHEST_PROTOCOL)
[docs] def read(self):
"""
Read the pickle from disk, returning the unpickled object.
"""
path = os.path.join(self._context.output_dir,
self._context.name,
'saved_state',
self._basename)
with open(path, 'rb') as infile:
return utils.pickle_load(infile)
def _write_stage_logs(self, result):
"""
Take the CASA log snippets attached to each result and write them to
the appropriate weblog directory. The log snippet is deleted from the
result after a successful write to keep the pickle size down.
"""
if not hasattr(result, 'casalog'):
return
stage_dir = os.path.join(self._context.report_dir,
'stage%s' % result.stage_number)
if not os.path.exists(stage_dir):
os.makedirs(stage_dir)
stagelog_entries = result.casalog
start = result.timestamps.start
end = result.timestamps.end
stagelog_path = os.path.join(stage_dir, 'casapy.log')
with open(stagelog_path, 'w') as stagelog:
LOG.debug('Writing CASA log entries for stage %s (%s -> %s)' %
(result.stage_number, start, end))
stagelog.write(stagelog_entries)
# having written the log entries, the CASA log entries have no
# further use. Remove them to keep the size of the pickle small
delattr(result, 'casalog')
[docs]class ResultsList(Results):
def __init__(self, results=None):
super(ResultsList, self).__init__()
self.__results = []
if results:
self.__results.extend(results)
def __getitem__(self, item):
return self.__results[item]
def __iter__(self):
return self.__results.__iter__()
def __len__(self):
return len(self.__results)
def __str__(self):
return 'ResultsList({!s})'.format(str(self.__results))
def __repr__(self):
return 'ResultsList({!s})'.format(repr(self.__results))
[docs] def append(self, other):
self.__results.append(other)
[docs] def accept(self, context=None):
return super(ResultsList, self).accept(context)
[docs] def extend(self, other):
for o in other:
self.append(o)
[docs] def merge_with_context(self, context):
for result in self.__results:
result.merge_with_context(context)
[docs]class StandardTaskTemplate(api.Task, metaclass=abc.ABCMeta):
"""
StandardTaskTemplate is a template class for pipeline reduction tasks whose
execution can be described by a common four-step process:
#. prepare(): examine the measurement set and prepare a list of\
intermediate job requests to be executed.
#. execute the jobs
#. analyze(): analyze the output of the intermediate job requests,\
deciding if necessary which parameters provide the best results, and\
return these results.
#. return a final list of jobs to be executed using these best-fit\
parameters.
Simpletask implements the :class:`Task` interface and steps 2 and 4 in the
above process, leaving subclasses to implement
:func:`~SimpleTask.prepare` and :func:`~SimpleTask.analyse`.
A Task and its :class:`Inputs` are closely aligned. It is anticipated that
the Inputs for a Task will be created using the :attr:`Task.Inputs`
reference rather than locating and instantiating the partner class
directly, eg.::
i = ImplementingTask.Inputs.create_from_context(context)
"""
# HeadTail is an internal class used to associate properties with their
# associated measurement sets
HeadTail = collections.namedtuple('HeadTail', ('head', 'tail'))
def __init__(self, inputs):
"""
Create a new Task with an initial state based on the given inputs.
:param Inputs inputs: inputs required for this Task.
"""
super(StandardTaskTemplate, self).__init__()
# complain if we were given the wrong type of inputs
if isinstance(inputs, vdp.InputsContainer):
error = (inputs._task_cls.Inputs != self.Inputs)
else:
error = not isinstance(inputs, self.Inputs)
if error:
msg = '{0} requires inputs of type {1} but got {2}.'.format(
self.__class__.__name__,
self.Inputs.__name__,
inputs.__class__.__name__)
raise TypeError(msg)
self.inputs = inputs
is_multi_vis_task = False
[docs] @abc.abstractmethod
def prepare(self, **parameters):
"""
Prepare job requests for execution.
:param parameters: the parameters to pass through to the subclass.
Refer to the implementing subclass for specific information on
what these parameters are.
:rtype: a class implementing :class:`~pipeline.api.Result`
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def analyse(self, result):
"""
Determine the best parameters by analysing the given jobs before
returning any final jobs to execute.
:param jobs: the job requests generated by :func:`~SimpleTask.prepare`
:type jobs: a list of\
:class:`~pipeline.infrastructure.jobrequest.JobRequest`
:rtype: \
:class:`~pipeline.api.Result`
"""
raise NotImplementedError
[docs] @timestamp
@capture_log
@result_finaliser
def execute(self, dry_run=True, **parameters):
# The filenamer deletes any identically named file when constructing
# the filename, which is desired when really executing a task but not
# when performing a dry run. This line disables the
# 'delete-on-generate' behaviour.
filenamer.NamingTemplate.dry_run = dry_run
if utils.is_top_level_task():
# Set the task name, but only if this is a top-level task. This
# name will be prepended to every data product name as a sign of
# their origin
try:
name = task_registry.get_casa_task(self.__class__)
except KeyError:
name = self.__class__.__name__
filenamer.NamingTemplate.task = name
# initialise the subtask counter, which will be subsequently
# incremented for every execute within this top-level task
self.inputs.context.task_counter += 1
LOG.info('Starting execution for stage %s',
self.inputs.context.task_counter)
self.inputs.context.subtask_counter = 0
# log the invoked pipeline task and its comment to
# casa_commands.log
_log_task(self, dry_run)
else:
self.inputs.context.subtask_counter += 1
# Create a copy of the inputs - including the context - and attach
# this copy to the Inputs. Tasks can then merge results with this
# duplicate context at will, as we'll later restore the originals.
original_inputs = self.inputs
self.inputs = utils.pickle_copy(original_inputs)
# create a job executor that tasks can use to execute subtasks
self._executor = Executor(self.inputs.context, dry_run)
# create a new log handler that will capture all messages above
# WARNING level.
handler = logging.CapturingHandler(logging.WARNING)
try:
# if this task does not handle multiple input mses but was
# invoked with multiple mses in its inputs, call our utility
# function to invoke the task once per ms.
if not self.is_multi_vis_task:
if isinstance(self.inputs, vdp.InputsContainer) or isinstance(self.inputs.vis, list):
return self._handle_multiple_vis(dry_run, **parameters)
if isinstance(self.inputs, vdp.InputsContainer):
container = self.inputs
LOG.info('Equivalent CASA call: %s', container._pipeline_casa_task)
# We should not pass unused parameters to prepare(), so first
# inspect the signature to find the names the arguments and then
# create a dictionary containing only those parameters
prepare_args = set(inspect.getargspec(self.prepare).args)
prepare_parameters = dict(parameters)
for arg in parameters:
if arg not in prepare_args:
del prepare_parameters[arg]
# register the capturing log handler, buffering all messages so that
# we can add them to the result - and subsequently, the weblog
logging.add_handler(handler)
# get our result
result = self.prepare(**prepare_parameters)
# analyse them..
result = self.analyse(result)
# tag the result with the class of the originating task
result.task = self.__class__
# add the log records to the result
if not hasattr(result, 'logrecords'):
result.logrecords = handler.buffer
else:
result.logrecords.extend(handler.buffer)
return result
except Exception as ex:
# Created a special result object for the failed task, but only if
# this is a top-level task; otherwise, raise the exception higher
# up.
if utils.is_top_level_task():
# Get the task name from the task registry, otherwise use the
# task class name.
try:
name = task_registry.get_casa_task(self.__class__)
except KeyError:
name = self.__class__.__name__
# Log error message.
tb = traceback.format_exc()
LOG.error('Error executing pipeline task %s.' % name)
LOG.error(tb)
# Create a special result object representing the failed task.
result = FailedTaskResults(self.__class__, ex, tb)
# add the log records to the result
if not hasattr(result, 'logrecords'):
result.logrecords = handler.buffer
else:
result.logrecords.extend(handler.buffer)
return result
else:
raise
finally:
# restore the context to the original context
self.inputs = original_inputs
# now the task has completed, we tell the namer not to delete again
filenamer.NamingTemplate.dry_run = True
# delete the task name once the top-level task is complete
if utils.is_top_level_task():
filenamer.NamingTemplate.task = None
# now that the WARNING and above messages have been attached to the
# result, remove the capturing logging handler from all loggers
if handler:
logging.remove_handler(handler)
# delete the executor so that the pickled context can be released
self._executor = None
def _handle_multiple_vis(self, dry_run, **parameters):
"""
Handle a single task invoked for multiple measurement sets.
This function handles the case when the vis parameter on the Inputs
specifies multiple measurement sets. In this situation, we want to
invoke the task for each individual MS. We could do this by having
each task iterate over the measurement sets involved, but in order to
keep the task implementations as simple as possible, that complexity
(unless overridden) is handled by the template task instead.
If the task wants to handle the multiple measurement sets
itself it should override is_multi_vis_task.
"""
# The following code loops through the MSes specified in vis,
# executing the task for the first value (head) and then appending the
# results of executing the remainder of the MS list (tail).
if len(self.inputs.vis) is 0:
# we don't return an empty list as the timestamp decorator wants
# to set attributes on this value, which it can't on a built-in
# list
return ResultsList()
container = self.inputs
LOG.info('Equivalent CASA call: %s', container._pipeline_casa_task)
results = ResultsList()
try:
for inputs in container:
self.inputs = inputs
single_result = self.execute(dry_run=dry_run, **parameters)
if isinstance(single_result, ResultsList):
results.extend(single_result)
else:
results.append(single_result)
return results
finally:
self.inputs = container
def _get_handled_headtails(self, names=None):
handled = collections.OrderedDict()
if names is None:
# no names to get so return empty dict
return handled
for name in names:
if hasattr(self.inputs, name):
property_value = getattr(self.inputs, name)
head = property_value[0]
tail = property_value[1:]
ht = StandardTaskTemplate.HeadTail(head=head, tail=tail)
handled[name] = ht
return handled
[docs]class FailedTask(StandardTaskTemplate):
def __init__(self, context):
inputs = vdp.InputsContainer(self, context)
super(FailedTask, self).__init__(inputs)
[docs]class Executor(object):
def __init__(self, context, dry_run=True):
self._dry_run = dry_run
self._context = context
self._cmdfile = os.path.join(context.report_dir,
context.logs['casa_commands'])
[docs] @capture_log
def execute(self, job, merge=False, **kwargs):
"""
Execute the given job or subtask, returning its output.
:param job: a job or subtask
:type job: an object conforming to the :class:`~pipeline.api.Task`\
interface
:rtype: :class:`~pipeline.api.Result`
"""
# execute the job, capturing its results object
result = job.execute(dry_run=self._dry_run, *kwargs)
if self._dry_run:
return result
# if the job was a JobRequest, log it to our command log
if isinstance(job, jobrequest.JobRequest):
# don't print shutil commands from MPI servers as the interleaved
# commands become confusing.
is_MPI_server = MPIEnvironment.is_mpi_enabled and not MPIEnvironment.is_mpi_client
omit_log = True if is_MPI_server and job.fn.__module__ == 'shutil' else False
if not omit_log:
self._log_jobrequest(job)
# if requested, merge the result with the context. No type checking
# here.
if merge and not self._dry_run:
result.accept(self._context)
return result
def _log_jobrequest(self, job):
# CAS-5262: casa_commands.log written by the pipeline should
# be formatted to be more easily readable.
# replace the working directory with ''..
job_str = re.sub('%s/' % self._context.output_dir, '',
str(job))
# wrap the text at the first open bracket
if '(' in job_str:
indent = (1+job_str.index('(')) * ' '
else:
indent = 10 * ' '
wrapped = textwrap.wrap(job_str,
subsequent_indent=indent,
width=80,
break_long_words=False)
with open(self._cmdfile, 'a') as cmdfile:
cmdfile.write('%s\n' % '\n'.join(wrapped))
def _log_task(task, dry_run):
if dry_run:
return
context = task.inputs.context
filename = os.path.join(context.report_dir,
context.logs['casa_commands'])
comment = ''
if not os.path.exists(filename):
wrapped = textwrap.wrap('# ' + _CASA_COMMANDS_PROLOGUE,
subsequent_indent='# ',
width=78,
break_long_words=False)
comment = ('raise Error(\'The casa commands log is not executable!\')\n'
'\n%s\n' % '\n'.join(wrapped))
comment += '\n# %s\n#\n' % getattr(task.inputs, '_pipeline_casa_task', 'unknown pipeline task')
# get the description of how this task functions and add it to the comment
comment += task_registry.get_comment(task.__class__)
with open(filename, 'a') as cmdfile:
cmdfile.write(comment)
[docs]def property_with_default(name, default, doc=None):
"""
Return a property whose value is reset to a default value when setting the
property value to None.
"""
# our standard name for the private property backing the public property
# is a prefix of one underscore
varname = '_' + name
def getx(self):
return object.__getattribute__(self, varname)
def setx(self, value):
if value is None:
value = default
object.__setattr__(self, varname, value)
# def delx(self):
# object.__delattr__(self, varname)
return property(getx, setx, None, doc)
[docs]def write_pipeline_casa_tasks(context):
"""
Write the equivalent pipeline CASA tasks for results in the context to a
file
"""
pipeline_tasks = []
for proxy in context.results:
result = proxy.read()
try:
pipeline_tasks.append(result.pipeline_casa_task)
except AttributeError:
pipeline_tasks.append('# stage %s: unknown task generated %s '
'result' % (result.stage_number,
result.__class__.__name__))
task_string = '\n'.join([' %s' % t for t in pipeline_tasks])
# replace the working directory with ''
task_string = re.sub('%s/' % context.output_dir, '', task_string)
state_commands = []
for o in (context.project_summary, context.project_structure, context.project_performance_parameters):
state_commands += ['context.set_state({!r}, {!r}, {!r})'.format(cls, name, value)
for cls, name, value in project.get_state(o)]
template = '''__rethrow_casa_exceptions = True
context = h_init()
%s
try:
%s
finally:
h_save()
''' % ('\n'.join(state_commands), task_string)
f = os.path.join(context.report_dir, context.logs['pipeline_script'])
with open(f, 'w') as casatask_file:
casatask_file.write(template)
_CASA_COMMANDS_PROLOGUE = (
'This file contains CASA commands run by the pipeline. Although all commands required to calibrate the data are '
'included here, this file cannot be executed, nor does it contain heuristic and flagging calculations performed by '
'pipeline code. This file is useful to understand which CASA commands are being run by each pipeline task. If one '
'wishes to re-run the pipeline, one should use the pipeline script linked on the front page or By Task page of the '
'weblog. Some stages may not have any commands listed here, e.g. hifa_importdata if conversion from ASDM to MS is '
'not required.'
)