Source code for pipeline.hsd.tasks.baselineflag.qa

import collections

import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.pipelineqa as pqa
import pipeline.qa.scorecalculator as qacalc
from . import baselineflag
from .renderer import accumulate_flag_per_source_spw

LOG = logging.get_logger(__name__)


[docs]class SDBLFlagListQAHandler(pqa.QAPlugin): result_cls = collections.Iterable child_cls = baselineflag.SDBLFlagResults
[docs] def handle(self, context, result): # Accumulate flag per field, spw to a dictionary # accum_flag[field][spw] = {'additional': # of flagged in task, 'total': # of total} accum_flag = accumulate_flag_per_source_spw(context, result) # Now define score per field, spw scores = [] for field, spwflag in accum_flag.items(): for spw, flagval in spwflag.items(): frac_flagged = flagval['additional'] / float(flagval['total']) label = ("Field %s Spw %s" % (field, spw)) scores.append(qacalc.score_sdtotal_data_flagged(label, frac_flagged)) result.qa.pool[:] = scores
[docs]class SDBLFlagQAHandler(pqa.QAPlugin): result_cls = baselineflag.SDBLFlagResults child_cls = None
[docs] def handle(self, context, result): # temporarily encapsulate result in a list so that we can use the same # QA scoring function as the aggregate ResultsList accum_flag = accumulate_flag_per_source_spw(context, [result]) vis = result.inputs['vis'] # Now define score per field, spw scores = [] for field, spwflag in accum_flag.items(): for spw, flagval in spwflag.items(): frac_flagged = flagval['additional'] / float(flagval['total']) label = '{!s} Field {!s} Spw {!s}'.format(vis, field, spw) scores.append(qacalc.score_sdtotal_data_flagged(label, frac_flagged)) result.qa.pool[:] = scores
# from pipeline.h.tasks.exportdata import aqua # aqua_exporter = aqua.xml_generator_for_metric('score_sdtotal_data_flagged', '{:0.3%}') # aqua.register_aqua_metric(aqua_exporter)