Source code for pipeline.hifa.tasks.gaincal.qa

import collections
import os

import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.pipelineqa as pqa
import pipeline.infrastructure.utils as utils
import pipeline.qa.gpcal as gpcal
from pipeline.hif.tasks.gaincal import common

LOG = logging.get_logger(__name__)


[docs]class TimegaincalQAPool(pqa.QAScorePool): score_types = { 'PHASE_SCORE_XY': ('XY_TOTAL', 'X-Y phase deviation'), 'PHASE_SCORE_X2X1': ('X2X1_TOTAL', 'X2-X1 phase deviation') } short_msg = { 'PHASE_SCORE_XY': 'X-Y deviation', 'PHASE_SCORE_X2X1': 'X2-X1 deviation' } def __init__(self, qa_results_dict): super(TimegaincalQAPool, self).__init__() self.qa_results_dict = qa_results_dict
[docs] def update_scores(self, ms, phase_field_ids): try: # # PIPE-365: Revise hifa_timegaincal QA scores for Cy7 # # Remy: Remove consideration of X-Y, X1-Y1 phase vs. time metrics # from QA scoring (keep evaluation for now - I will open a CASR to # revise these evaluations) # # self.pool.extend([self._get_qascore(ms, phase_field_ids, t) for t in self.score_types.iterkeys()]) pass except Exception as e: LOG.error('Score calculation failed: %s' % (e)) else: # We need to long_msg = 'QA metric calculation successful for {}'.format(ms.basename) short_msg = 'QA measured' origin = pqa.QAOrigin(metric_name='timegaincal_qa_calculated', metric_score=1, metric_units='Timegaincal QA metrics calculated') ms_qa_score = pqa.QAScore(score=1.0, longmsg=long_msg, shortmsg=short_msg, vis=ms.basename, origin=origin) self.pool.append(ms_qa_score)
def _get_qascore(self, ms, phase_field_ids, score_type): (total_score, table_name, field_name, ant_name, spw_name) = self._get_total(phase_field_ids, self.score_types[score_type][0]) longmsg = 'Total score for %s is %0.2f (%s field %s %s spw %s)' % ( self.score_types[score_type][1], total_score, ms.basename, field_name, ant_name, spw_name) shortmsg = self.short_msg[score_type] origin = pqa.QAOrigin(metric_name='TimegaincalQAPool', metric_score=total_score, metric_units='Total gpcal QA score') return pqa.QAScore(total_score, longmsg=longmsg, shortmsg=shortmsg, vis=ms.basename, origin=origin) def _get_total(self, phase_field_ids, score_key): # attrs to hold score and QA identifiers total_score = 1.0 total_table_name = None total_field_name = 'N/A' total_ant_name = 'N/A' total_spw_name = 'N/A' for table_name in self.qa_results_dict: qa_result = self.qa_results_dict[table_name] for field_id in phase_field_ids: qa_context_score = qa_result['QASCORES']['SCORES'][field_id][score_key] if qa_context_score['SCORE'] != 'C/C': if qa_context_score['SCORE'] < total_score: total_score = qa_context_score['SCORE'] total_field_name = qa_result['QASCORES']['FIELDS'][qa_context_score['FIELD']] total_ant_name = qa_result['QASCORES']['ANTENNAS'][qa_context_score['ANTENNA']] total_spw_name = qa_result['QASCORES']['SPWS'][qa_context_score['SPW']] total_table_name = table_name return total_score, total_table_name, total_field_name, total_ant_name, total_spw_name
[docs]class TimegaincalQAHandler(pqa.QAPlugin): """ QA handler for an uncontained TimegaincalResult. """ result_cls = common.GaincalResults child_cls = None
[docs] def handle(self, context, result): vis = result.inputs['vis'] ms = context.observing_run.get_ms(vis) phase_field_ids = [field.id for field in ms.get_fields(intent='PHASE')] qa_dir = os.path.join(context.report_dir, 'stage%s' % result.stage_number, 'qa') if not os.path.exists(qa_dir): os.makedirs(qa_dir) qa_results_dict = {} try: for calapp in result.final: solint = utils.get_origin_input_arg(calapp, 'solint') calmode = utils.get_origin_input_arg(calapp, 'calmode') if solint == 'int' and calmode == 'p': qa_results_dict[calapp.gaintable] = gpcal.gpcal(calapp.gaintable) qa_results_dict[calapp.gaintable]['PHASE_FIELDS'] = phase_field_ids result.qa = TimegaincalQAPool(qa_results_dict) result.qa.update_scores(ms, phase_field_ids) except Exception as e: LOG.error('Problem occurred running QA analysis. QA results will not be available for this task') LOG.exception(e)
[docs]class TimegaincalListQAHandler(pqa.QAPlugin): """ QA handler for a list containing TimegaincalResults. """ result_cls = collections.Iterable child_cls = common.GaincalResults
[docs] def handle(self, context, result): # collate the QAScores from each child result, pulling them into our # own QAscore list collated = utils.flatten([r.qa.pool for r in result]) result.qa.pool[:] = collated