Source code for pipeline.hifa.tasks.importdata.qa

import collections
from itertools import chain
from typing import List, Tuple, Dict, Callable, Set

import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.pipelineqa as pqa
import pipeline.infrastructure.utils as utils
import pipeline.qa.scorecalculator as qacalc
from pipeline.domain.field import Field
from pipeline.domain.measurementset import MeasurementSet
from pipeline.h.tasks.exportdata import aqua
from pipeline.infrastructure import casa_tools
from .almaimportdata import ALMAImportDataResults

LOG = logging.get_logger(__name__)

aqua_exporter = aqua.xml_generator_for_metric('ScoreParallacticAngle', '{:0.3f}')
aqua.register_aqua_metric(aqua_exporter)


[docs]class ALMAImportDataListQAHandler(pqa.QAPlugin): result_cls = collections.Iterable child_cls = ALMAImportDataResults
[docs] def handle(self, context, result): super().handle(context, result) # Check per-session parallactic angle coverage of polarisation calibration parallactic_threshold = result.inputs['minparang'] # gather mses into a flat list mses = list(chain(*(r.mses for r in result))) # PIPE-597 spec states to test POLARIZATION intent intents_to_test = {'POLARIZATION'} parang_scores, parang_ranges = _check_parallactic_angle_range(mses, intents_to_test, parallactic_threshold) result.qa.pool.extend(parang_scores) result.parang_ranges = parang_ranges
[docs]class ALMAImportDataQAHandler(pqa.QAPlugin): result_cls = ALMAImportDataResults child_cls = None
[docs] def handle(self, context, result): # Check for the presense of polarization intents recipe_name = context.project_structure.recipe_name polcal_scores = _check_polintents(recipe_name, result.mses) # Check for the presence of receiver bands with calibration issues score2 = _check_bands(result.mses) # Check for the presence of bandwidth switching score3 = _check_bwswitching(result.mses) # Check for science spw names matching the virtual spw ID lookup table score4 = _check_science_spw_names(result.mses, context.observing_run.virtual_science_spw_names) # Flux service usage score5 = _check_fluxservice(result) result.qa.pool.extend(polcal_scores) result.qa.pool.extend([score2, score3, score4, score5])
def _check_polintents(recipe_name: str, mses: List[MeasurementSet]) -> List[pqa.QAScore]: """ Check each measurement set for polarization intents """ return qacalc.score_polintents(recipe_name, mses) def _check_parallactic_angle_range(mses: List[MeasurementSet], intents: Set[str], threshold: float) -> Tuple[List[pqa.QAScore], Dict]: """ Check that the parallactic angle coverage of the polarisation calibrator meets the required threshold. See PIPE-597 and PIPE-598 for full spec. :param mses: MeasurementSets to check :param intents: intents to measure :param threshold: minimum parallactic angle coverage :return: list of QAScores and dictionary of metrics """ # holds list of all QA scores for this metric all_scores: List[pqa.QAScore] = [] # holds all parallactic angle ranges for all # session names, intents and pol cal names all_metrics = {'sessions': {}, 'pol_intents_found': False} intents_present = any([intents.intersection(ms.intents) for ms in mses]) # group MSes per sessions, adding to default 'Shared' session if not # defined session_to_mses = collections.defaultdict(list) for ms in mses: session_to_mses[getattr(ms, 'session', 'Shared')].append(ms) # Check parallactic angle for each polcal in each session for session_name, session_mses in session_to_mses.items(): all_metrics['sessions'][session_name] = {'min_parang_range': 360.0, 'vis': [ms_do.name for ms_do in session_mses]} for intent in intents: all_metrics['sessions'][session_name][intent] = {} polcal_names = {polcal.name for ms in session_mses for polcal in ms.get_fields(intent=intent)} if len(polcal_names) > 0: all_metrics['pol_intents_found'] = True for polcal_name in polcal_names: parallactic_range = ous_parallactic_range(session_mses, polcal_name, intent) all_metrics['sessions'][session_name][intent][polcal_name] = parallactic_range all_metrics['sessions'][session_name]['min_parang_range'] = min(all_metrics['sessions'][session_name]['min_parang_range'], parallactic_range) LOG.info(f'Parallactic angle range for {polcal_name} ({intent}) in session {session_name}: ' f'{parallactic_range}') session_scores = qacalc.score_parallactic_range( intents_present, session_name, polcal_name, parallactic_range, threshold ) all_scores.extend(session_scores) return all_scores, all_metrics def _check_bands(mses) -> pqa.QAScore: """ Check each measurement set for bands with calibration issues """ return qacalc.score_bands(mses) def _check_bwswitching(mses) -> pqa.QAScore: """ Check each measurement set for bandwidth switching calibration issues """ return qacalc.score_bwswitching(mses) def _check_science_spw_names(mses, virtual_science_spw_names) -> pqa.QAScore: """ Check science spw names """ return qacalc.score_science_spw_names(mses, virtual_science_spw_names) def _check_fluxservice(result) -> pqa.QAScore: """ Check flux service usage """ return qacalc.score_fluxservice(result) #- functions to measure parallactic angle coverage of polarisation calibrator ------------------------------------------ # Type aliases for the parallactic angle computations. ParallacticAngle = float SignedAngle = float PositiveDefiniteAngle = float
[docs]def ous_parallactic_range(mses: List[MeasurementSet], field_name: str, intent: str): """ Get the parallactic angle range across all measurement sets for field f when observed with the specifiedintent. :param mses: MeasurementSets to process :param f: Field to inspect :param intent: observing intent to consider :return: angular range expressed as (min angle, max angle) tuple """ angles = [] for ms in mses: fields = ms.get_fields(task_arg=field_name) if len(fields) != 1: LOG.error('Cannot determine parallactic angle for %s field: %s', ms.basename, field_name) continue field = fields[0] try: angles.extend(parallactic_range_for_field(ms, field, intent)) except ValueError as e: LOG.exception('Could not determine parallactic angle', exc_info=e) continue if not angles: return signed_range = range_after_processing(angles, to_signed) pd_range = range_after_processing(angles, to_positive_definite) return min((signed_range, pd_range))
[docs]def parallactic_range_for_field(ms: MeasurementSet, f: Field, intent: str) -> Tuple[float, float]: """ Get the parallactic angle range for field f when observed with the specified intent. :param ms: MeasurementSet to process :param f: Field to inspect :param intent: observing intent to consider :return: angular range expressed as (min angle, max angle) tuple """ # get the scans when the field was observed with the required intent scans = ms.get_scans(field=f.id, scan_intent=intent) if not scans: raise ValueError(f'No scans detected for field {f} with intent {intent}') # This uses the earliest start and latest end times of the scan domain # objects. In theory, times within a scan can be field and spw dependent so # they may not exactly match those of the field in question, but they # should be accurate enough for our purposes min_utc = min([s.start_time for s in scans], key=utils.get_epoch_as_datetime) max_utc = max([s.end_time for s in scans], key=utils.get_epoch_as_datetime) min_pa = parallactic_angle_at_epoch(f, min_utc) max_pa = parallactic_angle_at_epoch(f, max_utc) if min_pa > max_pa: min_pa, max_pa = max_pa, min_pa return min_pa, max_pa
[docs]def parallactic_angle_at_epoch(f: Field, e: dict) -> float: """ Get the instantaneous parallactic angle for field f at epoch e. :param f: Field domain object :param e: CASA epoch :return: angular separation in degrees """ me = casa_tools.measures qa = casa_tools.quanta try: me.doframe(me.observatory('ALMA')) me.doframe(e) pole_direction = me.direction('J2000', '0deg', '+90deg') pole_azel = me.measure(pole_direction, 'AZEL') field_azel = me.measure(f.mdirection, 'AZEL') separation = me.posangle(field_azel, pole_azel) sep_degs = qa.convert(separation, 'deg') return sep_degs['value'] finally: me.done()
[docs]def range_after_processing(fs: List[float], g: Callable[[float], float]): """ Get the range of a list of floats (fs) once processed by function g. """ processed = [g(f) for f in fs] return max(processed) - min(processed)
[docs]def to_signed(angle: ParallacticAngle) -> SignedAngle: if angle > 180: return angle-360 return angle
[docs]def to_positive_definite(angle: ParallacticAngle) -> PositiveDefiniteAngle: if angle < 0: return angle+360 return angle
#- end parallactic angle coverage functions ----------------------------------------------------------------------------