Source code for pipeline.hsd.tasks.baseline.qa

import numpy

import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.pipelineqa as pqa
import pipeline.infrastructure.utils as utils
import pipeline.qa.scorecalculator as qacalc
from . import baseline

LOG = logging.get_logger(__name__)


[docs]class SDBaselineQAHandler(pqa.QAPlugin): result_cls = baseline.SDBaselineResults child_cls = None
[docs] def handle(self, context, result): scores = [] lines_list = [] group_id_list = [] spw_id_list = [] field_id_list = [] reduction_group = context.observing_run.ms_reduction_group baselined = result.outcome['baselined'] for b in baselined: reduction_group_id = b['group_id'] members = b['members'] group_desc = reduction_group[reduction_group_id] spw_id = numpy.fromiter((group_desc[m].spw_id for m in members), dtype=numpy.int32) # b['spw'] field_id = numpy.fromiter((group_desc[m].field_id for m in members), dtype=numpy.int32) # b['field'] lines = b['lines'] lines_list.append(lines) group_id_list.append(reduction_group_id) spw_id_list.append(spw_id) field_id_list.append(field_id) scores.append(qacalc.score_sd_line_detection_for_ms(group_id_list, field_id_list, spw_id_list, lines_list)) result.qa.pool.extend(scores)
[docs]class SDBaselineListQAHandler(pqa.QAPlugin): result_cls = basetask.ResultsList child_cls = baseline.SDBaselineResults
[docs] def handle(self, context, result): # collate the QAScores from each child result, pulling them into our # own QAscore list collated = utils.flatten([r.qa.pool for r in result]) result.qa.pool[:] = collated