Source code for pipeline.qa.gpcal

#!/usr/bin/env python
import copy

import numpy as np

import pipeline.qa.utility.scorers as scorers
import pipeline.qa.utility.filters as filters
from pipeline.infrastructure import casa_tools


[docs]def gpcal(caltable): gpcal_stats = gpcal_calc(caltable) gpcal_scores = gpcal_score(gpcal_stats) gpcal_qa = {'QANUMBERS': gpcal_stats, 'QASCORES': gpcal_scores} return gpcal_qa
[docs]def gpcal_calc(caltable): '''Calculate X-Y and X2-X1 phase calibration statistics for int based phase calibration tables.''' removeoutliers = True with casa_tools.CalAnalysis(caltable) as ca: antennaNames = ca.antenna1() fieldNames = ca.field() tbLoc = casa_tools.table tbLoc.open(caltable) fieldIds = np.unique(tbLoc.getcol('FIELD_ID')).tolist() antIds = np.unique(tbLoc.getcol('ANTENNA1')).tolist() if 'SPECTRAL_WINDOW_ID' in tbLoc.colnames(): spwIds = np.unique(tbLoc.getcol('SPECTRAL_WINDOW_ID')).tolist() tbLoc.close() tbLoc.open(caltable + '/SPECTRAL_WINDOW') chanfreqs = tbLoc.getcol('CHAN_FREQ')[0] tbLoc.close() caltableformat = 'new' else: if 'CAL_DESC' not in tbLoc.keywordnames(): tbLoc.close() gpcal_stats = {'FIELDS': {'ERROR': True}, 'SPWS': {'ERROR': True}, 'ANTENNAS': {'ERROR': True}, 'STATS': {'ERROR': True}} return gpcal_stats tbLoc.close() tbLoc.open(caltable + '/CAL_DESC') spwIds = tbLoc.getcol('SPECTRAL_WINDOW_ID')[0].tolist() tbLoc.close() caltableformat = 'old' gpcal_stats = {'FIELDS': {}, 'SPWS': {}, 'ANTENNAS': {}, 'STATS': {}} tbLoc.open(caltable) for fIndex in range(len(fieldIds)): fieldId = fieldIds[fIndex] gpcal_stats['FIELDS'][fieldId] = fieldNames[fIndex] if fieldId not in gpcal_stats['STATS']: gpcal_stats['STATS'][fieldId] = {} for sIndex in range(len(spwIds)): spwId = spwIds[sIndex] gpcal_stats['SPWS'][spwId] = spwId if spwId not in gpcal_stats['STATS'][fieldId]: gpcal_stats['STATS'][fieldId][spwId] = {} for aIndex in range(len(antIds)): antId = antIds[aIndex] gpcal_stats['ANTENNAS'][antId] = antennaNames[aIndex] if antId not in gpcal_stats['STATS'][fieldId][spwId]: gpcal_stats['STATS'][fieldId][spwId][antId] = {} if caltableformat == 'new': gpcal_stats['STATS'][fieldId][spwId][antId]['chanFreq (GHz)'] = chanfreqs[spwId]/1.e9 tb1 = tbLoc.query('FIELD_ID == '+str(fieldId)+' AND SPECTRAL_WINDOW_ID == '+str(spwId)+' AND ANTENNA1 == '+str(antId)) else: tb1 = tbLoc.query('FIELD_ID == '+str(fieldId)+' AND CAL_DESC_ID == '+str(spwids.index(spwId))+' AND ANTENNA1 == '+str(antId)) ngains = tb1.nrows() if ngains == 0: gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = 'C/C' gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'] = 'C/C' gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = 'C/C' gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'] = 'C/C' continue if caltableformat == 'new': phase1 = tb1.getcol('CPARAM') flag1 = tb1.getcol('FLAG') else: phase1 = tb1.getcol('GAIN') tb1.close() phase1 = np.angle(phase1) phase1 = np.unwrap(phase1) if len(phase1) == 2: phase2 = [] for i in range(ngains): # don't use flagged points (see CAS-6804) if ((flag1[0][0][i]==False) and (flag1[1][0][i]==False)): phase2.append( phase1[0][0][i] - phase1[1][0][i] ) # Unwrap the difference *after* flagging phase2 = np.unwrap(np.array(phase2)) # Outlier removal turned on according to new comments in # CAS-6804. Threshold changed to 5 sigma. if removeoutliers == True: phase2 = filters.outlierFilter(phase2, 5.0) phase3 = [] for i in range(ngains-1): # don't use flagged points (see CAS-6804) if ((flag1[0][0][i+1]==False) and (flag1[0][0][i]==False)): phase3.append( phase1[0][0][i+1] - phase1[0][0][i] ) if removeoutliers == True: phase3 = filters.outlierFilter(phase3, 3.0) if caltableformat == 'new': if ((len(phase1) == 2) and (len(phase2) != 0)): gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = np.rad2deg(np.std(phase2)) gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'] = np.std(phase2)*(2.9979e8/(2*np.pi*chanfreqs[spwId])) else: gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = 'C/C' gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'] = 'C/C' if (len(phase3) != 0): gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = np.rad2deg(np.std(phase3)) gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'] = np.std(phase3)*(2.9979e8/(2*np.pi*chanfreqs[spwId])) else: gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = 'C/C' gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'] = 'C/C' else: if ((len(phase1) == 2) and (len(phase2) != 0)): gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = np.rad2deg(np.std(phase2)) else: gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = 'C/C' gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'] = 'C/C' if (len(phase3) != 0): gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = np.rad2deg(np.std(phase3)) else: gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = 'C/C' gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'] = 'C/C' tbLoc.close() return gpcal_stats
[docs]def gpcal_score(gpcal_stats): '''Calculate scores for phase statistics.''' gpcal_scores = {'SCORES': \ {'TOTAL': {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}, \ 'XY_TOTAL': {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}, \ 'X2X1_TOTAL': {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}}} gpcal_scores['FIELDS'] = copy.deepcopy(gpcal_stats['FIELDS']) gpcal_scores['SPWS'] = copy.deepcopy(gpcal_stats['SPWS']) gpcal_scores['ANTENNAS'] = copy.deepcopy(gpcal_stats['ANTENNAS']) if 'ERROR' in gpcal_stats['STATS']: return gpcal_scores # Using average sigmas for now. Eric's report lists sigmas per band / frequency. # Need to check if we have to distinguish by band. xyScorer = scorers.erfScorer(4.25e-6, 8.4e-5) x2x1Scorer = scorers.erfScorer(3.08e-5, 2.24e-4) totalXYMetrics = [] for fieldId in gpcal_stats['STATS']: fieldXYMetrics = [] if fieldId not in gpcal_scores['SCORES']: gpcal_scores['SCORES'][fieldId] = {} gpcal_scores['SCORES'][fieldId]['XY_TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0} gpcal_scores['SCORES'][fieldId]['X2X1_TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0} gpcal_scores['SCORES'][fieldId]['TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0} for spwId in gpcal_stats['STATS'][fieldId]: if spwId not in gpcal_scores['SCORES'][fieldId]: gpcal_scores['SCORES'][fieldId][spwId] = {} for antId in gpcal_stats['STATS'][fieldId][spwId]: if antId not in gpcal_scores['SCORES'][fieldId][spwId]: gpcal_scores['SCORES'][fieldId][spwId][antId] = {} try: gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_XY'] = xyScorer(gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)']) currentXYContextScore = {'FIELD': fieldId, 'SPW': spwId, 'ANTENNA': antId, 'SCORE': gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_XY']} fieldXYMetrics.append({'FIELD': fieldId, 'SPW': spwId, 'ANTENNA': antId, 'METRIC': gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)']}) totalXYMetrics.append({'FIELD': fieldId, 'SPW': spwId, 'ANTENNA': antId, 'METRIC': gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)']}) #gpcal_scores['SCORES'][fieldId]['XY_TOTAL'] = \ # filters.minDict([gpcal_scores['SCORES'][fieldId]['XY_TOTAL'], currentXYContextScore], 'SCORE') #gpcal_scores['SCORES'][fieldId]['TOTAL'] = \ # filters.minDict([gpcal_scores['SCORES'][fieldId]['TOTAL'], currentXYContextScore], 'SCORE') #gpcal_scores['SCORES']['XY_TOTAL'] = \ # filters.minDict([gpcal_scores['SCORES']['XY_TOTAL'], currentXYContextScore], 'SCORE') #gpcal_scores['SCORES']['TOTAL'] = \ # filters.minDict([gpcal_scores['SCORES']['TOTAL'], currentXYContextScore], 'SCORE') except Exception as e: gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_XY'] = 'C/C' # Don't count this in the totals since it is likely due to missing solutions because of # flagged antennas. Need to decide how to account for these cases. try: gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_X2X1'] = x2x1Scorer(gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)']) currentX2X1ContextScore = {'FIELD': fieldId, 'SPW': spwId, 'ANTENNA': antId, 'SCORE': gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_X2X1']} gpcal_scores['SCORES'][fieldId]['X2X1_TOTAL'] = \ filters.minDict([gpcal_scores['SCORES'][fieldId]['X2X1_TOTAL'], currentX2X1ContextScore], 'SCORE') gpcal_scores['SCORES'][fieldId]['TOTAL'] = \ filters.minDict([gpcal_scores['SCORES'][fieldId]['TOTAL'], currentX2X1ContextScore], 'SCORE') gpcal_scores['SCORES']['X2X1_TOTAL'] = \ filters.minDict([gpcal_scores['SCORES']['X2X1_TOTAL'], currentX2X1ContextScore], 'SCORE') gpcal_scores['SCORES']['TOTAL'] = \ filters.minDict([gpcal_scores['SCORES']['TOTAL'], currentX2X1ContextScore], 'SCORE') except Exception as e: gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_X2X1'] = 'C/C' # Don't count this in the totals since it is likely due to missing solutions because of # flagged antennas. Need to decide how to account for these cases. fieldXYMetrics = filters.outlierDictFilter(fieldXYMetrics, 'METRIC', 5.0) if (len(fieldXYMetrics) > 0): maxFieldXYMetric = filters.maxDict(fieldXYMetrics, 'METRIC') gpcal_scores['SCORES'][fieldId]['XY_TOTAL'] = {'FIELD': maxFieldXYMetric['FIELD'], 'SPW': maxFieldXYMetric['SPW'], 'ANTENNA': maxFieldXYMetric['ANTENNA'], 'SCORE': xyScorer(maxFieldXYMetric['METRIC'])} gpcal_scores['SCORES'][fieldId]['TOTAL'] = filters.minDict([gpcal_scores['SCORES'][fieldId]['TOTAL'], gpcal_scores['SCORES'][fieldId]['XY_TOTAL']], 'SCORE') else: gpcal_scores['SCORES'][fieldId]['XY_TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 'C/C'} totalXYMetrics = filters.outlierDictFilter(totalXYMetrics, 'METRIC', 5.0) if (len(totalXYMetrics) > 0): maxTotalXYMetric = filters.maxDict(totalXYMetrics, 'METRIC') gpcal_scores['SCORES']['XY_TOTAL'] = {'FIELD': maxTotalXYMetric['FIELD'], 'SPW': maxTotalXYMetric['SPW'], 'ANTENNA': maxTotalXYMetric['ANTENNA'], 'SCORE': xyScorer(maxTotalXYMetric['METRIC'])} gpcal_scores['SCORES']['TOTAL'] = filters.minDict([gpcal_scores['SCORES']['TOTAL'], gpcal_scores['SCORES']['XY_TOTAL']], 'SCORE') else: gpcal_scores['SCORES']['XY_TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 'C/C'} return gpcal_scores