Source code for pipeline.infrastructure.renderer.regression
"""
The regression module contains base classes and plugin registry for the
pipeline's regression test value extractor framework.
This module contains two classes:
* RegressionExtractor: the base class for extractor plug-ins
* RegressionExtractorRegistry: the registry and manager for plug-ins
Tasks provide and register their own extractors that each extends
RegressionExtractor. These extractor plug-ins analyse the results of a task,
extracting pertinent values and writing them to a dict. The keys of the
output dict identify the value; the values of the dict are the extracted
values themselves.
The pipeline QA framework is activated whenever a Results instance is accepted
into the pipeline context. The pipeline QA framework operates by calling
is_handler_for(result) on each registered QAPlugin, passing it the the accepted
Results instance for inspection. QAPlugins that claim to handle the Result are
given the Result for processing. In this step, the QA framework calls
QAPlugin.handle(context, result), the method overridden by the task-specific
QAPlugin.
"""
import abc
import collections
import os.path
import re
from collections import OrderedDict
from typing import Dict
from pipeline.domain.measures import FluxDensityUnits
from pipeline.h.tasks.applycal.applycal import ApplycalResults
from pipeline.h.tasks.common.commonfluxresults import FluxCalibrationResults
from pipeline.hif.tasks.applycal.ifapplycal import IFApplycal
from pipeline.hifa.tasks.fluxscale.gcorfluxscale import GcorFluxscale
from pipeline.hifa.tasks.gfluxscaleflag.resultobjects import GfluxscaleflagResults
from ..taskregistry import task_registry
from .. import logging
LOG = logging.get_logger(__name__)
[docs]class RegressionExtractor(object, metaclass=abc.ABCMeta):
"""
RegressionExtractor is the mandatory base class for all regression test
result extractors.
"""
# the Results class this handler is expected to handle
result_cls = None
# if result_cls is a list, the type of classes it is expected to contain
child_cls = None
# the task class that generated the results, or None if it should handle
# all results of this type regardless of which task generated it
generating_task = None
[docs] def is_handler_for(self, result):
"""
Return True if this RegressionExtractor can process the Result.
:param result: the task Result to inspect
:return: True if the Result can be processed
"""
# if the result is not a list or the expected results class,
# return False
if not isinstance(result, self.result_cls):
return False
# this is the expected class and we weren't expecting any
# children, so we should be able to handle the result
if self.child_cls is None and (self.generating_task is None
or result.task is self.generating_task):
return True
try:
if all([isinstance(r, self.child_cls) and
(self.generating_task is None or r.task is self.generating_task)
for r in result]):
return True
return False
except:
# catch case when result does not have a task attribute
return False
[docs] @abc.abstractmethod
def handle(self, result) -> Dict[str, float]:
"""
This method should return a dict of
{'applycal.new_flags.science': 0.34,
'applycal.new_flags.bandpass': 0.53}
:param result:
:return:
"""
raise NotImplemented
[docs]class RegressionExtractorRegistry(object):
"""
The registry and manager of the regression result extractor framework.
The responsibility of the RegressionResultRegistry is to pass Results to
RegressionExtractors that can handle them.
"""
def __init__(self):
self.__plugins_loaded = False
self.__handlers = []
[docs] def add_handler(self, handler):
task = handler.generating_task.__name__ if handler.generating_task else 'all'
child_name = ''
if hasattr(handler.child_cls, '__name__'):
child_name = handler.child_cls.__name__
elif isinstance(handler.child_cls, collections.Iterable):
child_name = str([x.__name__ for x in handler.child_cls])
container = 's of %s' % child_name
s = '{}{} results generated by {} tasks'.format(handler.result_cls.__name__, container, task)
LOG.debug('Registering {} as new regression result handler for {}'.format(handler.__class__.__name__, s))
self.__handlers.append(handler)
[docs] def handle(self, result):
if not self.__plugins_loaded:
for plugin_class in RegressionExtractor.__subclasses__():
self.add_handler(plugin_class())
self.__plugins_loaded = True
# this is the list which will contain extracted values tuples
extracted = {}
# Process leaf results first
if isinstance(result, collections.Iterable):
for r in result:
d = self.handle(r)
extracted = union(extracted, d)
# process the group-level results.
for handler in self.__handlers:
if handler.is_handler_for(result):
LOG.debug('{} extracting regression results for {}'.format(handler.__class__.__name__,
result.__class__.__name__))
d = handler.handle(result)
extracted = union(extracted, d)
return extracted
[docs]def union(d1: dict, d2: dict):
"""
Return the union of two dicts, raising an exception if duplicate keys are
detected in the input dicts.
"""
intersection = key_intersection(d1, d2)
if intersection:
raise ValueError('Regression keys are duplicated: {}'.format(intersection))
# dict keys and values should be strings, so ok to shallow copy
# OrderedDict is used to store results in processing order.
u = OrderedDict(d1)
u.update(d2)
return u
[docs]def key_intersection(d1: dict, d2: dict):
"""
Compare keys of two dicts, returning duplicate keys.
"""
d1_keys = set(d1.keys())
d2_keys = set(d2.keys())
return d1_keys.intersection(d2_keys)
registry = RegressionExtractorRegistry()
[docs]class FluxcalflagRegressionExtractor(RegressionExtractor):
"""
Regression test result extractor for hifa_gfluxscaleflag.
The extracted values are:
- the number of flagged rows before this task
- the number of flagged rows after this task
"""
result_cls = GfluxscaleflagResults
child_cls = None
[docs] def handle(self, result):
prefix = get_prefix(result)
summaries_by_name = {s['name']: s for s in result.cafresult.summaries}
num_flags_before = summaries_by_name['before']['flagged']
if 'after' in summaries_by_name:
num_flags_after = summaries_by_name['after']['flagged']
else:
num_flags_after = num_flags_before
d = OrderedDict()
d['{}.num_rows_flagged.before'.format(prefix)] = int(num_flags_before)
d['{}.num_rows_flagged.after'.format(prefix)] = int(num_flags_after)
qa_entries = extract_qa_score_regression(prefix, result)
d.update(qa_entries)
return d
[docs]class GcorFluxscaleRegressionExtractor(RegressionExtractor):
"""
Regression test result extractor for hifa_gcorfluxscale
The extracted values are:
- Stokes I for each field and spw, in Jy
"""
result_cls = FluxCalibrationResults
child_cls = None
generating_task = GcorFluxscale
[docs] def handle(self, result):
prefix = get_prefix(result)
d = OrderedDict()
for field_id, measurements in result.measurements.items():
for m in measurements:
key = '{}.field_{}.spw_{}.I'.format(prefix, field_id, m.spw_id)
d[key] = str(m.I.to_units(FluxDensityUnits.JANSKY))
qa_entries = extract_qa_score_regression(prefix, result)
d.update(qa_entries)
return d
[docs]class ApplycalRegressionExtractor(RegressionExtractor):
"""
Regression test result extractor for applycal tasks.
"""
result_cls = ApplycalResults
child_cls = None
generating_task = IFApplycal
[docs] def handle(self, result):
prefix = get_prefix(result)
summaries_by_name = {s['name']: s for s in result.summaries}
num_flags_before = summaries_by_name['before']['flagged']
num_flags_after = summaries_by_name['applycal']['flagged']
d = OrderedDict()
d['{}.num_rows_flagged.before'.format(prefix)] = int(num_flags_before)
d['{}.num_rows_flagged.after'.format(prefix)] = int(num_flags_after)
flag_summary_before = summaries_by_name['before']
for scan_id, v in flag_summary_before['scan'].items():
d['{}.scan_{}.num_rows_flagged.before'.format(prefix, scan_id)] = int(v['flagged'])
flag_summary_after = summaries_by_name['applycal']
for scan_id, v in flag_summary_after['scan'].items():
d['{}.scan_{}.num_rows_flagged.after'.format(prefix, scan_id)] = int(v['flagged'])
qa_entries = extract_qa_score_regression(prefix, result)
d.update(qa_entries)
return d
[docs]def get_prefix(result):
vis, _ = os.path.splitext(os.path.basename(result.inputs['vis']))
casa_task = task_registry.get_casa_task(result.task)
prefix = 's{}.{}.{}'.format(result.stage_number, casa_task, vis)
return prefix
[docs]def extract_qa_score_regression(prefix, result):
d = OrderedDict()
for qa_score in result.qa.pool:
metric_name = qa_score.origin.metric_name
# Remove all non-word characters (everything except numbers and letters)
metric_name = re.sub(r"[^\w\s]", '', metric_name)
# Replace all runs of whitespace with a single dash
metric_name = re.sub(r"\s+", '-', metric_name)
metric_score = qa_score.origin.metric_score
score_value = qa_score.score
d['{}.qa.metric.{}'.format(prefix, metric_name)] = metric_score
d['{}.qa.score.{}'.format(prefix, metric_name)] = score_value
return d
[docs]def extract_regression_results(context):
unified = OrderedDict()
for results_proxy in context.results:
results = results_proxy.read()
unified = union(unified, registry.handle(results))
# return unified
return ['{}={}'.format(k, v) for k, v in unified.items()]
# TODO enable runtime comparisons?