Source code for pipeline.h.tasks.importdata.qa

import collections
import datetime
import os

import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.pipelineqa as pqa
import pipeline.infrastructure.utils as utils
import pipeline.qa.scorecalculator as qacalc
from pipeline.infrastructure import casa_tasks, casa_tools
from . import importdata
from ..exportdata import aqua

LOG = logging.get_logger(__name__)


[docs]class ImportDataQAHandler(pqa.QAPlugin): result_cls = importdata.ImportDataResults child_cls = None
[docs] def handle(self, context, result): score1 = self._check_intents(result.mses) new_origin = pqa.QAOrigin(metric_name='MissingIntentsMark', metric_score=score1.origin.metric_score, metric_units='Measure of missing scan intents') score1.origin = new_origin self._check_flagged_calibrator_data(result.mses) score3 = self._check_model_data_column(result.mses) score4 = self._check_history_column(result.mses, result.inputs) LOG.todo('How long can MSes be separated and still be considered ' 'contiguous?') score5 = self._check_contiguous(result.mses) score6 = self._check_coordinates(result.mses) score7 = self._check_science_spw_names(result.mses, context.observing_run.virtual_science_spw_names) LOG.todo('ImportData QA: missing source.xml and calibrator unknown to ' 'CASA') LOG.todo('ImportData QA: missing BDFs') scores = [score1, score3, score4, score5, score6, score7] result.qa.pool.extend(scores)
@staticmethod def _check_contiguous(mses): """ Check whether observations are contiguous. """ tolerance = datetime.timedelta(hours=1) return qacalc.score_contiguous_session(mses, tolerance=tolerance) @staticmethod def _check_model_data_column(mses): """ Check whether any of the measurement sets contain a MODEL_DATA column, complaining if present, and returning an appropriate QA score. """ bad_mses = [] for ms in mses: with casa_tools.TableReader(ms.name) as table: if 'MODEL_DATA' in table.colnames(): bad_mses.append(ms) return qacalc.score_ms_model_data_column_present(mses, bad_mses) @staticmethod def _check_history_column(mses, inputs): """ Check whether any of the measurement sets has entries in the history column, potentially signifying a non-pristine data set. """ bad_mses = [] createmms = False if 'createmms' not in inputs else inputs['createmms'] for ms in mses: history_table = os.path.join(ms.name, 'HISTORY') with casa_tools.TableReader(history_table) as table: if table.nrows() != 0: origin_col = table.getcol('ORIGIN') if createmms: # special treatment is needed when createmms mode is turned on for i in range(len(origin_col)): if (origin_col[i] == 'importasdm' or origin_col[i] == 'partition' or origin_col[i] == 'im::calcuvw()'): continue bad_mses.append(ms) break else: for i in range(len(origin_col)): if origin_col[i] == 'importasdm' or origin_col[i] == 'im::calcuvw()': continue bad_mses.append(ms) break return qacalc.score_ms_history_entries_present(mses, bad_mses) @staticmethod def _check_flagged_calibrator_data(mses): """ Check how much calibrator data has been flagged in the given measurement sets, complaining if the fraction of flagged data exceeds a threshold. """ LOG.todo('What fraction of flagged calibrator data should result in a warning?') threshold = 0.10 # which intents should be checked for flagged data calibrator_intents = {'AMPLITUDE', 'BANDPASS', 'PHASE'} # flag for whether to print 'all scans ok' message at end all_ok = True for ms in mses: bad_scans = collections.defaultdict(list) # inspect each MS with flagdata, capturing the dictionary # describing the number of flagged rows flagdata_task = casa_tasks.flagdata(vis=ms.name, mode='summary') flagdata_result = flagdata_task.execute(dry_run=False) for intent in calibrator_intents: # collect scans with the calibrator intent calscans = [scan for scan in ms.scans if intent in scan.intents] for scan in calscans: scan_id = str(scan.id) # read the number of flagged/total integrations from the # flagdata results dictionary flagged = flagdata_result['scan'][scan_id]['flagged'] total = flagdata_result['scan'][scan_id]['total'] if flagged / total >= threshold: bad_scans[intent].append(scan) all_ok = False for intent in bad_scans: scan_ids = [scan.id for scan in bad_scans[intent]] multi = False if len(scan_ids) is 1 else True # log something like 'More than 12% of PHASE scans 1, 2, and 7 # in vla.ms are flagged' LOG.warning('More than %s%% of %s scan%s %s in %s %s flagged' '' % (threshold * 100.0, intent, 's' if multi else '', utils.commafy(scan_ids, quotes=False), ms.basename, 'are' if multi else 'is')) if all_ok: LOG.info('All %s scans in %s have less than %s%% flagged ' 'data' % (utils.commafy(calibrator_intents, False), utils.commafy([ms.basename for ms in mses]), threshold * 100.0)) def _check_intents(self, mses): """ Check each measurement set in the list for a set of required intents. TODO Should we terminate execution on missing intents? """ return qacalc.score_missing_intents(mses) def _check_coordinates(self, mses): """ Check each measurement set in the list for zero valued coordinates. """ return qacalc.score_ephemeris_coordinates(mses) def _check_science_spw_names(self, mses, virtual_science_spw_names): ''' Check science spw names ''' return qacalc.score_science_spw_names(mses, virtual_science_spw_names)
[docs]class ImportDataListQAHandler(pqa.QAPlugin): result_cls = collections.Iterable child_cls = importdata.ImportDataResults
[docs] def handle(self, context, result): # collate the QAScores from each child result, pulling them into our # own QAscore list collated = utils.flatten([r.qa.pool[:] for r in result]) result.qa.pool.extend(collated)
aqua_exporter = aqua.xml_generator_for_metric('MissingIntentsMark', '{:0.3f}') aqua.register_aqua_metric(aqua_exporter)