"""
The pipelineqa module contains base classes and plugin registry for the
pipeline's QA framework.
This module contains four key classes:
* QAScore: the base class for QA scores
* QAScorePool: the container for lists of QAScores
* QAPlugin: the base class for task-specific QA handlers
* QARegistry: the registry and manager for QA plug-ins
Tasks provide and register their own QA handlers that each extends QAPlugin.
These QA handlers analyse the results of a task, in the process adding
QA scores to the Result for display in the web log.
The pipeline QA framework is activated whenever a Results instance is accepted
into the pipeline context. The pipeline QA framework operates by calling
is_handler_for(result) on each registered QAPlugin, passing it the the accepted
Results instance for inspection. QAPlugins that claim to handle the Result are
given the Result for processing. In this step, the QA framework calls
QAPlugin.handle(context, result), the method overridden by the task-specific
QAPlugin.
"""
import abc
import collections
import enum
import operator
from typing import Optional, Set, List
from . import logging
LOG = logging.get_logger(__name__)
# QAOrigin holds information to help understand how and from where the QA scores are derived
QAOrigin = collections.namedtuple('QAOrigin', 'metric_name metric_score metric_units')
# Default origin for QAScores that do not define their own origin
NULL_ORIGIN = QAOrigin(metric_name='Unknown metric',
metric_score='N/A',
metric_units='')
[docs]class WebLogLocation(enum.Enum):
"""
WebLogLocation is an enumeration attached to each QA score, specifying
where in the web log this QA score should be included.
"""
# Render output to the QA details banner at the top of the task detail page
BANNER = enum.auto()
# Render output to QA section of the accordion at bottom of task detail page
ACCORDION = enum.auto()
# Include this QA score in calculations, but do not render it
HIDDEN = enum.auto()
# Place this score in the banner or accordion, as appropriate. This
# emulates < Cycle 7 behaviour.
UNSET = enum.auto()
[docs]class TargetDataSelection:
"""
TargetDataSelection is a struct to hold data selection metadata. Its various
properties (vis, scan, spw, etc.) should be set to specify to which subset of
data something applies.
"""
def __init__(self, session: Set[str] = None, vis: Set[str] = None, scan: Set[int] = None,
spw: Set[int] = None, field: Set[int] = None, intent: Set[str] = None,
ant: Set[int] = None, pol: Set[str] = None):
if session is None:
session = set()
if vis is None:
vis = set()
if scan is None:
scan = set()
if spw is None:
spw = set()
if field is None:
field = set()
if intent is None:
intent = set()
if ant is None:
ant = set()
if pol is None:
pol = set()
self.session = session
self.vis = vis
self.scan = scan
self.spw = spw
self.field = field
self.intent = intent
self.ant = ant
self.pol = pol
def __str__(self):
attr_names = ['session', 'vis', 'scan', 'spw', 'field', 'intent', 'ant', 'pol']
attr_strs = []
for attr_name in attr_names:
val = getattr(self, attr_name)
if not val:
continue
msg = '{}={}'.format(attr_name, ','.join([str(o) for o in sorted(val)]))
attr_strs.append(msg)
all_attrs = ', '.join(attr_strs)
return f'TargetDataSelection({all_attrs})'
[docs]class QAScore(object):
def __init__(self, score, longmsg='', shortmsg='', vis=None, origin=NULL_ORIGIN,
weblog_location=WebLogLocation.UNSET, hierarchy='',
applies_to: Optional[TargetDataSelection]=None):
"""
QAScore represent a normalised assessment of data quality.
The QA score is normalised to the range 0.0 to 1.0. Any score outside
this range will be truncated at presentation time to lie within this
range.
The long message may be rendered on the task detail page, depending on
whether this score is considered significant and the rendering hints
given by the weblog_location argument.
The short message associated with a QA score is used on the task
summary page when this task is considered representative for the stage.
A QAScore is derived from an unnormalised metric, which should be
provided as the 'origin' argument when available. These metrics may be
exported to the AQUA report, depending on their significance to the
task QA and whether AQUA metric export is enabled for the particular
task.
The weblog_location and hierarchy attributes are intended to be used
together, set by appropriate task-specific aggregation logic in the
QAPlugin code, to specify how this score should be aggregated in
overall score calculations and if/how it should be presented in the
weblog.
The hierarchy attribute should be in dotted string format, e.g.,
'amp_vs_freq.amp.slope'. QAPlugins use this metadata to organise how
the score should be grouped for processing and aggregation.
:param score: numeric QA score, in range 0.0 to 1.0
:param longmsg: verbose textual description of this score
:param shortmsg: concise textual summary of this score
:param vis: name of measurement set assessed by this QA score (DEPRECATED)
:param origin: metric from which this score was calculated
:param weblog_location: destination for web log presentation of this
score
:param hierarchy: location in QA hierarchy, in dot-separated format
:param applies_to: data selection covered by this QA assessment
"""
self.score = score
self.longmsg = longmsg
self.shortmsg = shortmsg
if applies_to is None:
applies_to = TargetDataSelection()
# if the 'vis' deprecated argument is supplied, add it to the data selection
# TODO refactor all old uses of QAScore to supply applies_to rather than vis
if vis is not None:
applies_to.vis.add(vis)
self.applies_to = applies_to
self.origin = origin
self.weblog_location = weblog_location
self.hierarchy = hierarchy
def __str__(self):
return 'QAScore(%r, %r, %r, %s)'.format(self.score, self.longmsg, self.shortmsg, self.applies_to)
def __repr__(self):
origin = None if self.origin is NULL_ORIGIN else self.origin
return 'QAScore({!s}, longmsg={!r}, shortmsg={!r}, origin={!r}, applies_to={!s})'.format(
self.score, self.longmsg, self.shortmsg, origin, self.applies_to)
[docs]class QAScorePool(object):
all_unity_longmsg = 'All QA completed successfully'
all_unity_shortmsg = 'QA pass'
def __init__(self):
self.pool: List[QAScore] = []
self._representative: Optional[QAScore] = None
@property
def representative(self):
if self._representative is not None:
return self._representative
if not self.pool:
return QAScore(None, 'No QA scores registered for this task', 'No QA')
# if all([s.score >= 0.9 for s in self.pool]):
# return QAScore(min([s.score for s in self.pool]), self.all_unity_longmsg, self.all_unity_shortmsg)
# maybe have different algorithms here. for now just return the
# QAScore with minimum score
return min(self.pool, key=operator.attrgetter('score'))
@representative.setter
def representative(self, value):
self._representative = value
[docs]class QAPlugin(object, metaclass=abc.ABCMeta):
"""
QAPlugin is the mandatory base class for all pipeline QA handlers.
Each pipeline tasks should create its own task-specific QA handler that
extends QAPlugin and implements the QAPlugin.handle(context, result)
method to perform QA analysis specific to that task. New QA handlers
should specify which type of Results classes they process by defining the
result_cls and child_cls class properties. If the same Results class is
returned by multiple tasks, e.g., fluxscale and setjy, then the
generating_task class property should also be defined, which will cause
the handler to be activated only when the Result instance is generated by
the specified task.
The results structure for many pipeline tasks is to return a ResultsList
container object that contains many task-specific Results instances, one
per EB. Two QAPlugins must be registered for this type of task: one to
process the per-EB Results leaf objects, and another to process the
containing ResultsList, pulling up the QA scores on each per-EB Result
into the ResultsList's QAScorePool and setting a representative score.
This can be achieved with two new QAPlugins, e.g.,
# This will process the per-EB results
MyTaskQAPlugin(QAHandler):
result_cls = MyTaskResults
child_cls = None
# This will process the container
MyTaskContainerQAPlugin(QAHandler):
result_cls =ResultsList
child_cls = MyTaskResults
Within QAPlugin.handle(context, result), a QA Handler can analyse, modify,
or make additions to the Results instances in any way it sees fit. In
practice, the standard modification is to create and add one or more new
QAScore instances to the QAScorePool attribute of the Result.
Extending the QAPlugin base class automatically registers the subclass with
with the pipeline QA framework. However, QAPlugin must be explicitly
extended, and not implicitly inherited via another subclass of QAPlugin.
Put another way, if class Foo extends QAPlugin, and class Bar extends Foo,
only Foo is registered with the QA framework. To register Bar, the class
definition must use multiple inheritance, e.g., 'class Bar(Foo, QAPlugin):'.
"""
# the Results class this handler is expected to handle
result_cls = None
# if result_cls is a list, the type of classes it is expected to contain
child_cls = None
# the task class that generated the results, or None if it should handle
# all results of this type regardless of which task generated it
generating_task = None
[docs] def is_handler_for(self, result):
"""
Return True if this QAPlugin can process the Result.
:param result: the task Result to inspect
:return: True if the Result can be processed
"""
# if the result is not a list or the expected results class,
# return False
if not isinstance(result, self.result_cls):
return False
# this is the expected class and we weren't expecting any
# children, so we should be able to handle the result
if self.child_cls is None and (self.generating_task is None
or result.task is self.generating_task):
return True
try:
if all([isinstance(r, self.child_cls) and
(self.generating_task is None or r.task is self.generating_task)
for r in result]):
return True
return False
except:
# catch case when result does not have a task attribute
return False
[docs] @abc.abstractmethod
def handle(self, context, result):
pass
[docs]class QARegistry(object):
"""
The registry and manager of the pipeline QA framework.
The responsibility of the QARegistry is to pass Results to QAPlugins that
can handle them.
"""
def __init__(self):
self.__plugins_loaded = False
self.__handlers = []
[docs] def add_handler(self, handler):
task = handler.generating_task.__name__ if handler.generating_task else 'all'
child_name = ''
if hasattr(handler.child_cls, '__name__'):
child_name = handler.child_cls.__name__
elif isinstance(handler.child_cls, collections.Iterable):
child_name = str([x.__name__ for x in handler.child_cls])
container = 's of %s' % child_name
s = '%s%s results generated by %s tasks' % (handler.result_cls.__name__, container, task)
LOG.debug('Registering %s as new pipeline QA handler for %s', handler.__class__.__name__, s)
self.__handlers.append(handler)
[docs] def do_qa(self, context, result):
if not self.__plugins_loaded:
for plugin_class in QAPlugin.__subclasses__():
self.add_handler(plugin_class())
self.__plugins_loaded = True
# if this result is iterable, process the lower-level scalar results
# first
if isinstance(result, collections.Iterable):
for r in result:
self.do_qa(context, r)
# register the capturing log handler, buffering all messages so that
# we can add them to the result - and subsequently, the weblog
logging_handler = logging.CapturingHandler(logging.WARNING)
logging.add_handler(logging_handler)
try:
# with the leaf results processed, the containing handler can now
# collate the lower-level scores or process as a group
for handler in self.__handlers:
if handler.is_handler_for(result):
LOG.debug('%s handling QA analysis for %s' % (handler.__class__.__name__,
result.__class__.__name__))
handler.handle(context, result)
if hasattr(result, 'logrecords'):
result.logrecords.extend(logging_handler.buffer)
finally:
# now that the messages from the QA stage have been attached to
# the result, remove the capturing logging handler from all loggers
logging.remove_handler(logging_handler)
qa_registry = QARegistry()