Source code for pipeline.h.tasks.exportdata.aqua

"""
Prototype pipeline AQUA report generator


Definitions
    Metrics are physical quantities, e.g. phase rms improvement resulting from
    WVR calibration., % data flagged, etc

    Scores are numbers between 0.0 and 1.0 derived from metrics.  Not all
    metrics currently derived in the pipeline are scored.

Structure
    The report contains
        A project structure section.
        A QA summary section.
        A per stage QA section.
        A per topic QA section.

Issues with the Original Schema / Current Pipeline Design
    The per ASDM dimension was ignored.

    The multiple metrics / scores per stage and / or per ASDM
    dimension was ignored.

    For stages with single scores / metrics and multiple ASDMs the
    current report generator selects the MS with the worst metric and
    reports that value. This metric by definition corresponds to
    the lowest score.

    Stages which generate multiple scores / metrics and multiple
    ASDMs are currently dealt with on an ad hoc basis.

    The scores and metrics are noew stored with the stage results.

    Metrics may have units information. They may be encoded as
    CASA quanta strings if appropriate.

Future Technical Solutions
    Suggestions
    Add a toAqua method to the base results class which returns a
    list of metrics for export. Pass these to the QA classes
    for scoring.

    Add the euivalent of a  toAqua registration method similar to what is
    done with QA handlers already
"""
import datetime
import itertools
import operator
import os
from typing import List
import xml.etree.cElementTree as ElementTree
from xml.dom import minidom

import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.renderer.qaadapter as qaadapter
import pipeline.infrastructure.utils as utils
from pipeline import environment
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure.pipelineqa import QAScore

LOG = logging.get_logger(__name__)

# constant for an undefined value
UNDEFINED = 'Undefined'

# this holds all QA-metric-to-XML export functions
_AQUA_REGISTRY = set()

# Maps task name to function that gets sensitivity dict from result
TASK_NAME_TO_SENSITIVITY_EXPORTER = {}


[docs]def register_aqua_metric(fn): """ Register a 'QA metric to XML' conversion function. This function can also be used as a decorator. :param fn: function to register :return: """ _AQUA_REGISTRY.add(fn) return fn
[docs]class AquaXmlGenerator(object): """ Class to create the XML for an AQUA pipeline report. """
[docs] def get_report_xml(self, context): """ Create and return the AQUA report XML for the results stored in a context. :param context: pipeline context to parse :return: root XML Element of AQUA report :rtype: xml.etree.cElementTree.Element """ # read in all results in the context all_results = [r.read() for r in context.results] # the imaging tasks don't wrap themselves in a ResultsList. Until they # do, we have to fake that here. for idx, r in enumerate(all_results): try: iter(r) except TypeError: # temporary import for expedience # TODO make this a utility function import pipeline.infrastructure.renderer.htmlrenderer as htmlrenderer all_results[idx] = htmlrenderer.wrap_in_resultslist(r) # Initialize root = ElementTree.Element('PipelineAquaReport') # Construct the project structure element root.append(self.get_project_structure(context)) # Construct the QA summary element root.append(self.get_qa_summary(context)) # Construct the per pipeline stage elements root.append(self.get_per_stage_qa(context, all_results)) # Construct the topics elements. root.append(self.get_topics_qa(context, all_results)) return root
[docs] def get_project_structure(self, context): """ Get the project structure element. Given the current data flow it is unclear how the report generator generator will acquire the entity id of the original processing request. The processing procedure name is known but not yet passed to the pipeline processing request. :param context: pipeline context :return: XML for project structure :rtype: xml.etree.cElementTree.Element """ root = ElementTree.Element('ProjectStructure') ElementTree.SubElement(root, 'ProposalCode').text = context.project_summary.proposal_code ElementTree.SubElement(root, 'ProcessingRequestEntityId').text = UNDEFINED if context.project_structure.recipe_name == 'Undefined': ElementTree.SubElement(root, 'ProcessingProcedure').text = UNDEFINED else: ElementTree.SubElement(root, 'ProcessingProcedure').text = context.project_structure.recipe_name return root
[docs] def get_qa_summary(self, context): """ Get the AQUA summary XML element. :param context: pipeline context :return: XML summarising execution :rtype: xml.etree.cElementTree.Element """ root = ElementTree.Element('QaSummary') # Generate the report date now = datetime.datetime.utcnow() ElementTree.SubElement(root, 'ReportDate').text = now.strftime('%Y-%m-%d %H:%M:%S') # Processing time exec_start = context.results[0].read().timestamps.start exec_end = context.results[-1].read().timestamps.end # remove unnecessary precision for execution duration dt = exec_end - exec_start exec_duration = datetime.timedelta(days=dt.days, seconds=dt.seconds) ElementTree.SubElement(root, 'ProcessingTime').text = str(exec_duration) # Software versions ElementTree.SubElement(root, 'CasaVersion').text = environment.casa_version_string ElementTree.SubElement(root, 'PipelineVersion').text = environment.pipeline_revision # Score for the complete pipeline run # NB. the final pipeline score is not yet available. ElementTree.SubElement(root, 'FinalScore').text = UNDEFINED return root
[docs] def get_per_stage_qa(self, context, all_results): """ Get the XML for all stages. :param context: pipeline context :param all_results: all Results for this pipeline run :return: XML for all stages :rtype: xml.etree.cElementTree.Element """ # Get the stage summary element. xml_root = ElementTree.Element('QaPerStage') ordered_results = sorted(all_results, key=operator.attrgetter('stage_number')) for stage_result in ordered_results: # Create the generic stage element stage_name, stage_score = _get_pipeline_stage_and_score(stage_result) stage_element = ElementTree.Element('Stage', Number=str(stage_result.stage_number), Name=stage_name, Score=str(stage_score)) all_scores = stage_result.qa.pool # calculate which items have specific renderers and which require # procesing by the generic renderer needs_specific = [qa_score for qa_score in all_scores if any([fn.handles(qa_score.origin.metric_name) for fn in _AQUA_REGISTRY])] needs_generic = [qa_score for qa_score in all_scores if not any([fn.handles(qa_score.origin.metric_name) for fn in _AQUA_REGISTRY])] # create a pseudo registry for the generic XML generator generic_registry = {GenericMetricXmlGenerator()} # generate XML for those with a specific renderer specific_elements = self._get_xml_for_qa_scores(needs_specific, _AQUA_REGISTRY) generic_elements = self._get_xml_for_qa_scores(needs_generic, generic_registry) stage_element.extend(specific_elements) # stage_element.extend(generic_elements) xml_root.append(stage_element) return xml_root
def _get_xml_for_qa_scores(self, items, registry): """ Generate the XML elements for a list of QA scores. :param items: list of QAScores :param registry: list of XML generator functions :return: list of XML elements :rtype: list of xml.etree.ElementTree """ # group scores into a {<metric name>: [<QAScore, ...>]} dict metric_to_scores = {} keyfunc = operator.attrgetter('origin.metric_name') s = sorted(list(items), key=keyfunc) for k, g in itertools.groupby(s, keyfunc): metric_to_scores[k] = list(g) # let each generator process the QA scores it can handle, accumulating # the XML as we go elements = [] for metric_name, scores in metric_to_scores.items(): xml = [fn(scores) for fn in registry if fn.handles(metric_name)] elements.extend(utils.flatten(xml)) return elements
[docs] def get_topics_qa(self, context, all_results): """ Get the XML for all results, divided into sections by topic. :param context: pipeline context :param all_results: all Results for this pipeline run :return: XML for topics :rtype: xml.etree.cElementTree.Element """ # Set the top level topics element. root = ElementTree.Element('QaPerTopic') # Add the data topic topic = qaadapter.registry.get_dataset_topic() dataset_results = [r for r in all_results if topic.handles_result(r)] root.append(self.get_dataset_topic(context, dataset_results)) # Add the flagging topic topic = qaadapter.registry.get_flagging_topic() flagging_results = [r for r in all_results if topic.handles_result(r)] root.append(self.get_flagging_topic(context, flagging_results)) # Add the calibration topic topic = qaadapter.registry.get_calibration_topic() calibration_results = [r for r in all_results if topic.handles_result(r)] root.append(self.get_calibration_topic(context, calibration_results)) # Add the imaging topic topic = qaadapter.registry.get_imaging_topic() imaging_results = [r for r in all_results if topic.handles_result(r)] root.append(self.get_imaging_topic(context, imaging_results)) return root
[docs] def get_calibration_topic(self, context, topic_results): """ Get the XML for the calibration topic. :param context: pipeline context :param topic_results: list of Results for this topic :return: XML for calibration topic :rtype: xml.etree.cElementTree.Element """ return self._xml_for_topic('Calibration', context, topic_results)
[docs] def get_dataset_topic(self, context, topic_results): """ Get the XML for the dataset topic. :param context: pipeline context :param topic_results: list of Results for this topic :return: XML for dataset topic :rtype: xml.etree.cElementTree.Element """ return self._xml_for_topic('Dataset', context, topic_results)
[docs] def get_flagging_topic(self, context, topic_results): """ Get the XML for the flagging topic. :param context: pipeline context :param topic_results: list of Results for this topic :return: XML for flagging topic :rtype: xml.etree.cElementTree.Element """ return self._xml_for_topic('Flagging', context, topic_results)
[docs] def get_imaging_topic(self, context, topic_results): """ Get the XML for the imaging topic. :param context: pipeline context :param topic_results: list of Results for this topic :return: XML for imaging topic :rtype: xml.etree.cElementTree.Element """ return self._xml_for_topic('Imaging', context, topic_results)
def _xml_for_topic(self, topic_name, context, topic_results): # the overall topic score is defined as the minimum score of all # representative scores for each task in that topic, which themselves # are the minimum of the scores for that task try: min_score = min([r.qa.representative for r in topic_results if r.qa.representative.score is not None], key=operator.attrgetter('score')) score = str(min_score.score) except ValueError: # empty list score = UNDEFINED xml_root = ElementTree.Element(topic_name, Score=score) topic_xml = self.get_per_stage_qa(context, topic_results) xml_root.extend(topic_xml) return xml_root
[docs]def export_to_disk(report, filename): """ Convert an XML document to a nicely formatted XML string and save it in a file. """ xmlstr = ElementTree.tostring(report, 'utf-8') # Reformat it to prettyprint style reparsed = minidom.parseString(xmlstr) reparsed_xmlstr = reparsed.toprettyxml(indent=' ') # Save it to a file. with open(filename, 'w') as aquafile: aquafile.write(reparsed_xmlstr)
[docs]def vis_to_asdm(vispath): """ Get the expected ASDM name from the path of a measurement set. :param vispath: path to convert :return: expected name of ASDM for MS """ return os.path.splitext(os.path.basename(vispath))[0]
[docs]def xml_generator_for_metric(qa_label, value_spec): """ Return a function that converts a matching QAScore to XML. :param qa_label: QA metric label to match :param value_spec: string format spec for how to format metric value :return: function """ # We don't (yet) allow % in the output XML, even when it represents a # percentage if value_spec.endswith('%}'): value_formatter = _create_trimmed_formatter(value_spec, 1) else: value_formatter = _create_value_formatter(value_spec) # return LowestScoreMetricXmlGenerator(qa_label, formatters={'Value': value_formatter}) return MetricXmlGenerator(qa_label, formatters={'Value': value_formatter})
[docs]class MetricXmlGenerator(object): """ Creates a AQUA report XML element for QA scores. """ def __init__(self, metric_name, formatters=None): """ The constructor accepts an optional dict of string formatters: functions that accept a string and return a formatted string. If this argument is not supplied, the default formatter keys and formatter functions applied will be: 'Name': convert to string 'Value': convert to string 'Asdm': return basename minus extension 'QaScore': convert to string :param metric_name: metric to match :param formatters: (optional) dict string formatters """ self.metric_name = metric_name # set default attribute formatters before updating with user overrides self.attr_formatters = { 'Name': str, 'Value': str, 'Asdm': vis_to_asdm, 'QaScore': str, 'Session': str, } if formatters: self.attr_formatters.update(formatters) def __call__(self, qa_scores: List[QAScore]) -> List[ElementTree.ElementTree]: scores_to_process = self.filter(qa_scores) return [self.to_xml(score) for score in scores_to_process]
[docs] def handles(self, metric_name: str) -> bool: """ Returns True if this class can generate XML for the given metric. :param metric_name: name of metric :return: True if metric handled by this class """ return metric_name == self.metric_name
[docs] def filter(self, qa_scores: List[QAScore]) -> List[QAScore]: """ Reduce a list of entries to those entries that require XML to be generated. :param qa_scores: list of QAScores :return: list of QAScores """ return qa_scores
[docs] def to_xml(self, qa_score: QAScore) -> ElementTree.ElementTree: """ Return the XML representation of a QA score and associated metric. :param qa_score: QA score to convert :return: XML element :rtype: xml.etree.ElementTree.Element """ if not qa_score: return None origin = qa_score.origin score_value = str(qa_score.score) init_args = dict( Name=self.attr_formatters['Name'](origin.metric_name), Value=self.attr_formatters['Value'](origin.metric_score), QaScore=self.attr_formatters['QaScore'](score_value) ) target_asdms = qa_score.applies_to.vis if target_asdms: init_args['Asdm'] = ','.join([self.attr_formatters['Asdm'](a) for a in target_asdms]) target_session = qa_score.applies_to.session if target_session: init_args['Session'] = ','.join([self.attr_formatters['Session'](s) for s in target_session]) return ElementTree.Element('Metric', **init_args)
[docs]class LowestScoreMetricXmlGenerator(MetricXmlGenerator): """ Metric XML Generator that only returns XML for the lowest QA score that it handles. """ def __init__(self, metric_name, formatters=None): super(LowestScoreMetricXmlGenerator, self).__init__(metric_name, formatters)
[docs] def filter(self, qa_scores): handled = [(vis, qa_score) for vis, qa_score in qa_scores if self.handles(qa_score.origin.metric_name)] if not handled: return [] lowest = min(handled, key=lambda vis_qa_score: vis_qa_score[1].score) return [lowest]
[docs]class GenericMetricXmlGenerator(MetricXmlGenerator): """ Metric XML Generator that processes any score it is given, formatting the metric value to 3dp. """ def __init__(self): # format all processed entries to 3dp formatters = {'Value': _create_value_formatter('{:0.3f}')} super(GenericMetricXmlGenerator, self).__init__('Generic metric', formatters)
[docs] def handles(self, _): return True
def _create_trimmed_formatter(format_spec, trim=0): """ Create a function that formats values as a percent. :param format_spec: string format specification to apply :param trim: number of characters to trim :return: function """ g = _create_value_formatter(format_spec) def f(val): val = g(val) if val == UNDEFINED: return UNDEFINED else: return val[:-trim] return f def _create_value_formatter(format_spec): """ Create a function that applies a string format spec. This function return a function that accepts one argument and returns the string formatted according to the given string format specification. If the argument cannot be formatted, the default 'undefined' string will be returned. This is used internally to create a set of formatting functions that all exhibit the same behaviour, whereby 'Undefined' is returned on errors. :param format_spec: string format specification to apply :return: function """ def f(val): try: return format_spec.format(val) except (ValueError, TypeError): # Handle lists of metrics and other possible flavors with a string # representation. return str(val) except: return UNDEFINED return f def _get_pipeline_stage_and_score(result): """ Get the CASA equivalent task name which is stored by the infrastructure as <task_name> (<arg1> = <value1>, ...) """ stage_name = result.pipeline_casa_task.split('(')[0] score = result.qa.representative.score return stage_name, score
[docs]def sensitivity_xml_for_stages(context, results, name=''): """ Get the XML for all sensitivities reported by all tasks. :param context: pipeline context :param results: all results for the imaging topic :param name: the name of per stage tag (optional) :return: XML for sensitivities :rtype: xml.etree.cElementTree.Element """ xml_root = ElementTree.Element('ImageSensitivity') ordered_results = sorted(results, key=operator.attrgetter('stage_number')) for stage_result in ordered_results: # Create the generic stage element stage_name, stage_score = _get_pipeline_stage_and_score(stage_result) for task_name, exporter in TASK_NAME_TO_SENSITIVITY_EXPORTER.items(): if stage_name == task_name: stage_xml = xml_for_sensitivity_stage(context, stage_result, exporter, name) xml_root.append(stage_xml) return xml_root
[docs]def xml_for_sensitivity_stage(context, stage_results, exporter, name): """ Translate the sensitivity dictionaries contained in a task result to XML. :param context: pipeline context :param stage_results: hifa_preimagecheck result :param exporter: function that returns a list of sensitivity dicts from the result :param name: the name of per stage tag (optional) :return: XML for all sensitivities reported by the result stage :rtype: xml.etree.cElementTree.Element """ stage_name, stage_score = _get_pipeline_stage_and_score(stage_results) tagname = name if name != '' else 'SensitivityEstimates' xml_root = ElementTree.Element(tagname, Origin=stage_name, Number=str(stage_results.stage_number), Score=str(stage_score)) sensitivity_dicts = exporter(stage_results) for d in sensitivity_dicts: ms_xml = xml_for_sensitivity(d) xml_root.append(ms_xml) return xml_root
[docs]def xml_for_sensitivity(d): """ Return the XML representation for a sensitivity dictionary. :param d: sensitivity dict :return: XML element :rtype: xml.etree.cElementTree.Element """ qa = casa_tools.quanta def value(quanta): return str(qa.getvalue(quanta)[0]) try: bandwidth = qa.quantity(d['bandwidth']) bandwidth_hz = value(qa.convert(bandwidth, 'Hz')) if bandwidth_hz == '0.0': bandwidth_hz = 'N/A' except: bandwidth_hz = 'N/A' try: effective_bw = qa.quantity(d['effective_bw']) effective_bw_hz = value(qa.convert(effective_bw, 'Hz')) if effective_bw_hz == '0.0': effective_bw_hz = 'N/A' except: effective_bw_hz = 'N/A' try: major = qa.quantity(d['beam']['major']) major_arcsec = value(qa.convert(major, 'arcsec')) if major_arcsec == '0.0': major_arcsec = 'N/A' except: major_arcsec = 'N/A' try: minor = qa.quantity(d['beam']['minor']) minor_arcsec = value(qa.convert(minor, 'arcsec')) if minor_arcsec == '0.0': minor_arcsec = 'N/A' except: minor_arcsec = 'N/A' try: cell_x = qa.quantity(d['cell'][0]) cell_x_arcsec = value(qa.convert(cell_x, 'arcsec')) if cell_x_arcsec == '0.0': cell_x_arcsec = 'N/A' except: cell_x_arcsec = 'N/A' try: cell_y = qa.quantity(d['cell'][1]) cell_y_arcsec = value(qa.convert(cell_y, 'arcsec')) if cell_y_arcsec == '0.0': cell_y_arcsec = 'N/A' except: cell_y_arcsec = 'N/A' try: positionangle = qa.quantity(d['beam']['positionangle']) positionangle_deg = value(qa.convert(positionangle, 'deg')) # Do not check for 0.0. Could be a real value. except: positionangle_deg = 'N/A' try: sensitivity = qa.quantity(d['sensitivity']) sensitivity_jy_per_beam = value(qa.convert(sensitivity, 'Jy/beam')) if sensitivity_jy_per_beam == '0.0': sensitivity_jy_per_beam = 'N/A' except: sensitivity_jy_per_beam = 'N/A' xml = ElementTree.Element('Sensitivity', Array=d['array'], BandwidthHz=bandwidth_hz, EffectiveBandwidthHz=effective_bw_hz, BeamMajArcsec=major_arcsec, BeamMinArcsec=minor_arcsec, BeamPosAngDeg=positionangle_deg, BwMode=d['bwmode'], CellXArcsec=cell_x_arcsec, CellYArcsec=cell_y_arcsec, Field=d['field'], Robust=str(d.get('robust', '')), UVTaper=str(d.get('uvtaper', '')), SensitivityJyPerBeam=sensitivity_jy_per_beam, MsSpwId=d['spw'], ) return xml