Source code for pipeline.hifv.tasks.fluxscale.qa

import collections
import os
import numpy as np

import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.pipelineqa as pqa
import pipeline.infrastructure.utils as utils
import pipeline.qa.scorecalculator as qacalc
from . import Fluxboot2
from . import Fluxboot2Results
from . import solint

LOG = logging.get_logger(__name__)


[docs]class SolintQAHandler(pqa.QAPlugin): result_cls = solint.SolintResults child_cls = None generating_task = solint.Solint
[docs] def handle(self, context, result): # Check for existence of the the target MS. score1 = self._ms_exists(os.path.dirname(result.inputs['vis']), os.path.basename(result.inputs['vis'])) scores = [score1] result.qa.pool.extend(scores)
def _ms_exists(self, output_dir, ms): """ Check for the existence of the target MS """ return qacalc.score_path_exists(output_dir, ms, 'Solint')
[docs]class SolintListQAHandler(pqa.QAPlugin): """ QA handler for a list containing SolintResults. """ result_cls = collections.Iterable child_cls = solint.SolintResults generating_task = solint.Solint
[docs] def handle(self, context, result): # collate the QAScores from each child result, pulling them into our # own QAscore list collated = utils.flatten([r.qa.pool for r in result]) result.qa.pool[:] = collated mses = [r.inputs['vis'] for r in result] longmsg = 'No missing target MS(s) for %s' % utils.commafy(mses, quotes=False, conjunction='or') result.qa.all_unity_longmsg = longmsg
[docs]class Fluxboot2QAHandler(pqa.QAPlugin): result_cls = Fluxboot2Results child_cls = None generating_task = Fluxboot2
[docs] def handle(self, context, result): # Get a QA score based on RMS of the residuals per receiver band and source m = context.observing_run.get_ms(result.inputs['vis']) weblog_results = {} webdicts = {} ms = os.path.basename(result.inputs['vis']) weblog_results[ms] = result.weblog_results # Sort into dictionary collections to prep for table webdicts[ms] = collections.defaultdict(list) for row in sorted(weblog_results[ms], key=lambda p: (p['source'], float(p['freq']))): webdicts[ms][row['source']].append({'freq': row['freq'], 'data': row['data'], 'error': row['error'], 'fitteddata': row['fitteddata']}) rmsmeanvalues = self.computeRMSandMean(webdicts[ms]) score1 = qacalc.score_vla_flux_residual_rms(rmsmeanvalues) scores = [score1] if scores == []: LOG.error('Error with computing flux density bootstrapping residuals') scores = [pqa.QAScore(0.0, longmsg='Unable to compute flux density bootstrapping residuals.', shortmsg='Fluxboot issue.')] result.qa.pool.extend(scores)
[docs] def computeRMSandMean(self, webdicts): rmsmeanvalues = [] for source, datadicts in webdicts.items(): try: frequencies = [] residuals = [] for datadict in datadicts: residuals.append(float(datadict['data']) - float(datadict['fitteddata'])) frequencies.append(float(datadict['freq'])) rms = np.std(residuals) mean = np.mean(residuals) # Count number of residuals outside the mean +/- rms range count = len(residuals) - len([resid for resid in residuals if ((mean - rms) < resid < (mean + rms))]) rmsmeanvalues.append((np.std(residuals), np.mean(residuals), count)) except Exception as e: continue return rmsmeanvalues
[docs]class Fluxboot2ListQAHandler(pqa.QAPlugin): """ QA handler for a list containing FluxbootResults. """ result_cls = collections.Iterable child_cls = Fluxboot2Results
[docs] def handle(self, context, result): # collate the QAScores from each child result, pulling them into our # own QAscore list collated = utils.flatten([r.qa.pool for r in result]) result.qa.pool[:] = collated