Source code for pipeline.infrastructure.renderer.regression

"""
The regression module contains base classes and plugin registry for the
pipeline's regression test value extractor framework.

This module contains two classes:

    * RegressionExtractor: the base class for extractor plug-ins
    * RegressionExtractorRegistry: the registry and manager for plug-ins

Tasks provide and register their own extractors that each extends
RegressionExtractor. These extractor plug-ins analyse the results of a task,
extracting pertinent values and writing them to a dict. The keys of the
output dict identify the value; the values of the dict are the extracted
values themselves.

The pipeline QA framework is activated whenever a Results instance is accepted
into the pipeline context. The pipeline QA framework operates by calling
is_handler_for(result) on each registered QAPlugin, passing it the the accepted
Results instance for inspection. QAPlugins that claim to handle the Result are
given the Result for processing. In this step, the QA framework calls
QAPlugin.handle(context, result), the method overridden by the task-specific
QAPlugin.
"""
import abc
import collections
import os.path
import re
from collections import OrderedDict
from typing import Dict

from pipeline.domain.measures import FluxDensityUnits
from pipeline.h.tasks.applycal.applycal import ApplycalResults
from pipeline.h.tasks.common.commonfluxresults import FluxCalibrationResults
from pipeline.hif.tasks.applycal.ifapplycal import IFApplycal
from pipeline.hifa.tasks.fluxscale.gcorfluxscale import GcorFluxscale
from pipeline.hifa.tasks.gfluxscaleflag.resultobjects import GfluxscaleflagResults
from ..taskregistry import task_registry
from .. import logging

LOG = logging.get_logger(__name__)


[docs]class RegressionExtractor(object, metaclass=abc.ABCMeta): """ RegressionExtractor is the mandatory base class for all regression test result extractors. """ # the Results class this handler is expected to handle result_cls = None # if result_cls is a list, the type of classes it is expected to contain child_cls = None # the task class that generated the results, or None if it should handle # all results of this type regardless of which task generated it generating_task = None
[docs] def is_handler_for(self, result): """ Return True if this RegressionExtractor can process the Result. :param result: the task Result to inspect :return: True if the Result can be processed """ # if the result is not a list or the expected results class, # return False if not isinstance(result, self.result_cls): return False # this is the expected class and we weren't expecting any # children, so we should be able to handle the result if self.child_cls is None and (self.generating_task is None or result.task is self.generating_task): return True try: if all([isinstance(r, self.child_cls) and (self.generating_task is None or r.task is self.generating_task) for r in result]): return True return False except: # catch case when result does not have a task attribute return False
[docs] @abc.abstractmethod def handle(self, result) -> Dict[str, float]: """ This method should return a dict of {'applycal.new_flags.science': 0.34, 'applycal.new_flags.bandpass': 0.53} :param result: :return: """ raise NotImplemented
[docs]class RegressionExtractorRegistry(object): """ The registry and manager of the regression result extractor framework. The responsibility of the RegressionResultRegistry is to pass Results to RegressionExtractors that can handle them. """ def __init__(self): self.__plugins_loaded = False self.__handlers = []
[docs] def add_handler(self, handler): task = handler.generating_task.__name__ if handler.generating_task else 'all' child_name = '' if hasattr(handler.child_cls, '__name__'): child_name = handler.child_cls.__name__ elif isinstance(handler.child_cls, collections.Iterable): child_name = str([x.__name__ for x in handler.child_cls]) container = 's of %s' % child_name s = '{}{} results generated by {} tasks'.format(handler.result_cls.__name__, container, task) LOG.debug('Registering {} as new regression result handler for {}'.format(handler.__class__.__name__, s)) self.__handlers.append(handler)
[docs] def handle(self, result): if not self.__plugins_loaded: for plugin_class in RegressionExtractor.__subclasses__(): self.add_handler(plugin_class()) self.__plugins_loaded = True # this is the list which will contain extracted values tuples extracted = {} # Process leaf results first if isinstance(result, collections.Iterable): for r in result: d = self.handle(r) extracted = union(extracted, d) # process the group-level results. for handler in self.__handlers: if handler.is_handler_for(result): LOG.debug('{} extracting regression results for {}'.format(handler.__class__.__name__, result.__class__.__name__)) d = handler.handle(result) extracted = union(extracted, d) return extracted
[docs]def union(d1: dict, d2: dict): """ Return the union of two dicts, raising an exception if duplicate keys are detected in the input dicts. """ intersection = key_intersection(d1, d2) if intersection: raise ValueError('Regression keys are duplicated: {}'.format(intersection)) # dict keys and values should be strings, so ok to shallow copy # OrderedDict is used to store results in processing order. u = OrderedDict(d1) u.update(d2) return u
[docs]def key_intersection(d1: dict, d2: dict): """ Compare keys of two dicts, returning duplicate keys. """ d1_keys = set(d1.keys()) d2_keys = set(d2.keys()) return d1_keys.intersection(d2_keys)
registry = RegressionExtractorRegistry()
[docs]class FluxcalflagRegressionExtractor(RegressionExtractor): """ Regression test result extractor for hifa_gfluxscaleflag. The extracted values are: - the number of flagged rows before this task - the number of flagged rows after this task """ result_cls = GfluxscaleflagResults child_cls = None
[docs] def handle(self, result): prefix = get_prefix(result) summaries_by_name = {s['name']: s for s in result.cafresult.summaries} num_flags_before = summaries_by_name['before']['flagged'] if 'after' in summaries_by_name: num_flags_after = summaries_by_name['after']['flagged'] else: num_flags_after = num_flags_before d = OrderedDict() d['{}.num_rows_flagged.before'.format(prefix)] = int(num_flags_before) d['{}.num_rows_flagged.after'.format(prefix)] = int(num_flags_after) qa_entries = extract_qa_score_regression(prefix, result) d.update(qa_entries) return d
[docs]class GcorFluxscaleRegressionExtractor(RegressionExtractor): """ Regression test result extractor for hifa_gcorfluxscale The extracted values are: - Stokes I for each field and spw, in Jy """ result_cls = FluxCalibrationResults child_cls = None generating_task = GcorFluxscale
[docs] def handle(self, result): prefix = get_prefix(result) d = OrderedDict() for field_id, measurements in result.measurements.items(): for m in measurements: key = '{}.field_{}.spw_{}.I'.format(prefix, field_id, m.spw_id) d[key] = str(m.I.to_units(FluxDensityUnits.JANSKY)) qa_entries = extract_qa_score_regression(prefix, result) d.update(qa_entries) return d
[docs]class ApplycalRegressionExtractor(RegressionExtractor): """ Regression test result extractor for applycal tasks. """ result_cls = ApplycalResults child_cls = None generating_task = IFApplycal
[docs] def handle(self, result): prefix = get_prefix(result) summaries_by_name = {s['name']: s for s in result.summaries} num_flags_before = summaries_by_name['before']['flagged'] num_flags_after = summaries_by_name['applycal']['flagged'] d = OrderedDict() d['{}.num_rows_flagged.before'.format(prefix)] = int(num_flags_before) d['{}.num_rows_flagged.after'.format(prefix)] = int(num_flags_after) flag_summary_before = summaries_by_name['before'] for scan_id, v in flag_summary_before['scan'].items(): d['{}.scan_{}.num_rows_flagged.before'.format(prefix, scan_id)] = int(v['flagged']) flag_summary_after = summaries_by_name['applycal'] for scan_id, v in flag_summary_after['scan'].items(): d['{}.scan_{}.num_rows_flagged.after'.format(prefix, scan_id)] = int(v['flagged']) qa_entries = extract_qa_score_regression(prefix, result) d.update(qa_entries) return d
[docs]def get_prefix(result): vis, _ = os.path.splitext(os.path.basename(result.inputs['vis'])) casa_task = task_registry.get_casa_task(result.task) prefix = 's{}.{}.{}'.format(result.stage_number, casa_task, vis) return prefix
[docs]def extract_qa_score_regression(prefix, result): d = OrderedDict() for qa_score in result.qa.pool: metric_name = qa_score.origin.metric_name # Remove all non-word characters (everything except numbers and letters) metric_name = re.sub(r"[^\w\s]", '', metric_name) # Replace all runs of whitespace with a single dash metric_name = re.sub(r"\s+", '-', metric_name) metric_score = qa_score.origin.metric_score score_value = qa_score.score d['{}.qa.metric.{}'.format(prefix, metric_name)] = metric_score d['{}.qa.score.{}'.format(prefix, metric_name)] = score_value return d
[docs]def extract_regression_results(context): unified = OrderedDict() for results_proxy in context.results: results = results_proxy.read() unified = union(unified, registry.handle(results)) # return unified return ['{}={}'.format(k, v) for k, v in unified.items()]
# TODO enable runtime comparisons?