#!/usr/bin/env python
import copy
import numpy as np
import pipeline.qa.utility.scorers as scorers
import pipeline.qa.utility.filters as filters
from pipeline.infrastructure import casa_tools
[docs]def gpcal(caltable):
gpcal_stats = gpcal_calc(caltable)
gpcal_scores = gpcal_score(gpcal_stats)
gpcal_qa = {'QANUMBERS': gpcal_stats, 'QASCORES': gpcal_scores}
return gpcal_qa
[docs]def gpcal_calc(caltable):
'''Calculate X-Y and X2-X1 phase calibration statistics for int based
phase calibration tables.'''
removeoutliers = True
with casa_tools.CalAnalysis(caltable) as ca:
antennaNames = ca.antenna1()
fieldNames = ca.field()
tbLoc = casa_tools.table
tbLoc.open(caltable)
fieldIds = np.unique(tbLoc.getcol('FIELD_ID')).tolist()
antIds = np.unique(tbLoc.getcol('ANTENNA1')).tolist()
if 'SPECTRAL_WINDOW_ID' in tbLoc.colnames():
spwIds = np.unique(tbLoc.getcol('SPECTRAL_WINDOW_ID')).tolist()
tbLoc.close()
tbLoc.open(caltable + '/SPECTRAL_WINDOW')
chanfreqs = tbLoc.getcol('CHAN_FREQ')[0]
tbLoc.close()
caltableformat = 'new'
else:
if 'CAL_DESC' not in tbLoc.keywordnames():
tbLoc.close()
gpcal_stats = {'FIELDS': {'ERROR': True}, 'SPWS': {'ERROR': True}, 'ANTENNAS': {'ERROR': True}, 'STATS': {'ERROR': True}}
return gpcal_stats
tbLoc.close()
tbLoc.open(caltable + '/CAL_DESC')
spwIds = tbLoc.getcol('SPECTRAL_WINDOW_ID')[0].tolist()
tbLoc.close()
caltableformat = 'old'
gpcal_stats = {'FIELDS': {}, 'SPWS': {}, 'ANTENNAS': {}, 'STATS': {}}
tbLoc.open(caltable)
for fIndex in range(len(fieldIds)):
fieldId = fieldIds[fIndex]
gpcal_stats['FIELDS'][fieldId] = fieldNames[fIndex]
if fieldId not in gpcal_stats['STATS']:
gpcal_stats['STATS'][fieldId] = {}
for sIndex in range(len(spwIds)):
spwId = spwIds[sIndex]
gpcal_stats['SPWS'][spwId] = spwId
if spwId not in gpcal_stats['STATS'][fieldId]:
gpcal_stats['STATS'][fieldId][spwId] = {}
for aIndex in range(len(antIds)):
antId = antIds[aIndex]
gpcal_stats['ANTENNAS'][antId] = antennaNames[aIndex]
if antId not in gpcal_stats['STATS'][fieldId][spwId]:
gpcal_stats['STATS'][fieldId][spwId][antId] = {}
if caltableformat == 'new':
gpcal_stats['STATS'][fieldId][spwId][antId]['chanFreq (GHz)'] = chanfreqs[spwId]/1.e9
tb1 = tbLoc.query('FIELD_ID == '+str(fieldId)+' AND SPECTRAL_WINDOW_ID == '+str(spwId)+' AND ANTENNA1 == '+str(antId))
else:
tb1 = tbLoc.query('FIELD_ID == '+str(fieldId)+' AND CAL_DESC_ID == '+str(spwids.index(spwId))+' AND ANTENNA1 == '+str(antId))
ngains = tb1.nrows()
if ngains == 0:
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = 'C/C'
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'] = 'C/C'
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = 'C/C'
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'] = 'C/C'
continue
if caltableformat == 'new':
phase1 = tb1.getcol('CPARAM')
flag1 = tb1.getcol('FLAG')
else:
phase1 = tb1.getcol('GAIN')
tb1.close()
phase1 = np.angle(phase1)
phase1 = np.unwrap(phase1)
if len(phase1) == 2:
phase2 = []
for i in range(ngains):
# don't use flagged points (see CAS-6804)
if ((flag1[0][0][i]==False) and (flag1[1][0][i]==False)):
phase2.append( phase1[0][0][i] - phase1[1][0][i] )
# Unwrap the difference *after* flagging
phase2 = np.unwrap(np.array(phase2))
# Outlier removal turned on according to new comments in
# CAS-6804. Threshold changed to 5 sigma.
if removeoutliers == True:
phase2 = filters.outlierFilter(phase2, 5.0)
phase3 = []
for i in range(ngains-1):
# don't use flagged points (see CAS-6804)
if ((flag1[0][0][i+1]==False) and (flag1[0][0][i]==False)):
phase3.append( phase1[0][0][i+1] - phase1[0][0][i] )
if removeoutliers == True:
phase3 = filters.outlierFilter(phase3, 3.0)
if caltableformat == 'new':
if ((len(phase1) == 2) and (len(phase2) != 0)):
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = np.rad2deg(np.std(phase2))
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'] = np.std(phase2)*(2.9979e8/(2*np.pi*chanfreqs[spwId]))
else:
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = 'C/C'
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'] = 'C/C'
if (len(phase3) != 0):
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = np.rad2deg(np.std(phase3))
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'] = np.std(phase3)*(2.9979e8/(2*np.pi*chanfreqs[spwId]))
else:
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = 'C/C'
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'] = 'C/C'
else:
if ((len(phase1) == 2) and (len(phase2) != 0)):
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = np.rad2deg(np.std(phase2))
else:
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (deg)'] = 'C/C'
gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'] = 'C/C'
if (len(phase3) != 0):
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = np.rad2deg(np.std(phase3))
else:
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (deg)'] = 'C/C'
gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'] = 'C/C'
tbLoc.close()
return gpcal_stats
[docs]def gpcal_score(gpcal_stats):
'''Calculate scores for phase statistics.'''
gpcal_scores = {'SCORES': \
{'TOTAL': {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}, \
'XY_TOTAL': {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}, \
'X2X1_TOTAL': {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}}}
gpcal_scores['FIELDS'] = copy.deepcopy(gpcal_stats['FIELDS'])
gpcal_scores['SPWS'] = copy.deepcopy(gpcal_stats['SPWS'])
gpcal_scores['ANTENNAS'] = copy.deepcopy(gpcal_stats['ANTENNAS'])
if 'ERROR' in gpcal_stats['STATS']:
return gpcal_scores
# Using average sigmas for now. Eric's report lists sigmas per band / frequency.
# Need to check if we have to distinguish by band.
xyScorer = scorers.erfScorer(4.25e-6, 8.4e-5)
x2x1Scorer = scorers.erfScorer(3.08e-5, 2.24e-4)
totalXYMetrics = []
for fieldId in gpcal_stats['STATS']:
fieldXYMetrics = []
if fieldId not in gpcal_scores['SCORES']:
gpcal_scores['SCORES'][fieldId] = {}
gpcal_scores['SCORES'][fieldId]['XY_TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}
gpcal_scores['SCORES'][fieldId]['X2X1_TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}
gpcal_scores['SCORES'][fieldId]['TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 1.0}
for spwId in gpcal_stats['STATS'][fieldId]:
if spwId not in gpcal_scores['SCORES'][fieldId]:
gpcal_scores['SCORES'][fieldId][spwId] = {}
for antId in gpcal_stats['STATS'][fieldId][spwId]:
if antId not in gpcal_scores['SCORES'][fieldId][spwId]:
gpcal_scores['SCORES'][fieldId][spwId][antId] = {}
try:
gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_XY'] = xyScorer(gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)'])
currentXYContextScore = {'FIELD': fieldId, 'SPW': spwId, 'ANTENNA': antId, 'SCORE': gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_XY']}
fieldXYMetrics.append({'FIELD': fieldId, 'SPW': spwId, 'ANTENNA': antId, 'METRIC': gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)']})
totalXYMetrics.append({'FIELD': fieldId, 'SPW': spwId, 'ANTENNA': antId, 'METRIC': gpcal_stats['STATS'][fieldId][spwId][antId]['X-Y (m)']})
#gpcal_scores['SCORES'][fieldId]['XY_TOTAL'] = \
# filters.minDict([gpcal_scores['SCORES'][fieldId]['XY_TOTAL'], currentXYContextScore], 'SCORE')
#gpcal_scores['SCORES'][fieldId]['TOTAL'] = \
# filters.minDict([gpcal_scores['SCORES'][fieldId]['TOTAL'], currentXYContextScore], 'SCORE')
#gpcal_scores['SCORES']['XY_TOTAL'] = \
# filters.minDict([gpcal_scores['SCORES']['XY_TOTAL'], currentXYContextScore], 'SCORE')
#gpcal_scores['SCORES']['TOTAL'] = \
# filters.minDict([gpcal_scores['SCORES']['TOTAL'], currentXYContextScore], 'SCORE')
except Exception as e:
gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_XY'] = 'C/C'
# Don't count this in the totals since it is likely due to missing solutions because of
# flagged antennas. Need to decide how to account for these cases.
try:
gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_X2X1'] = x2x1Scorer(gpcal_stats['STATS'][fieldId][spwId][antId]['X2-X1 (m)'])
currentX2X1ContextScore = {'FIELD': fieldId, 'SPW': spwId, 'ANTENNA': antId, 'SCORE': gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_X2X1']}
gpcal_scores['SCORES'][fieldId]['X2X1_TOTAL'] = \
filters.minDict([gpcal_scores['SCORES'][fieldId]['X2X1_TOTAL'], currentX2X1ContextScore], 'SCORE')
gpcal_scores['SCORES'][fieldId]['TOTAL'] = \
filters.minDict([gpcal_scores['SCORES'][fieldId]['TOTAL'], currentX2X1ContextScore], 'SCORE')
gpcal_scores['SCORES']['X2X1_TOTAL'] = \
filters.minDict([gpcal_scores['SCORES']['X2X1_TOTAL'], currentX2X1ContextScore], 'SCORE')
gpcal_scores['SCORES']['TOTAL'] = \
filters.minDict([gpcal_scores['SCORES']['TOTAL'], currentX2X1ContextScore], 'SCORE')
except Exception as e:
gpcal_scores['SCORES'][fieldId][spwId][antId]['PHASE_SCORE_X2X1'] = 'C/C'
# Don't count this in the totals since it is likely due to missing solutions because of
# flagged antennas. Need to decide how to account for these cases.
fieldXYMetrics = filters.outlierDictFilter(fieldXYMetrics, 'METRIC', 5.0)
if (len(fieldXYMetrics) > 0):
maxFieldXYMetric = filters.maxDict(fieldXYMetrics, 'METRIC')
gpcal_scores['SCORES'][fieldId]['XY_TOTAL'] = {'FIELD': maxFieldXYMetric['FIELD'], 'SPW': maxFieldXYMetric['SPW'], 'ANTENNA': maxFieldXYMetric['ANTENNA'], 'SCORE': xyScorer(maxFieldXYMetric['METRIC'])}
gpcal_scores['SCORES'][fieldId]['TOTAL'] = filters.minDict([gpcal_scores['SCORES'][fieldId]['TOTAL'], gpcal_scores['SCORES'][fieldId]['XY_TOTAL']], 'SCORE')
else:
gpcal_scores['SCORES'][fieldId]['XY_TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 'C/C'}
totalXYMetrics = filters.outlierDictFilter(totalXYMetrics, 'METRIC', 5.0)
if (len(totalXYMetrics) > 0):
maxTotalXYMetric = filters.maxDict(totalXYMetrics, 'METRIC')
gpcal_scores['SCORES']['XY_TOTAL'] = {'FIELD': maxTotalXYMetric['FIELD'], 'SPW': maxTotalXYMetric['SPW'], 'ANTENNA': maxTotalXYMetric['ANTENNA'], 'SCORE': xyScorer(maxTotalXYMetric['METRIC'])}
gpcal_scores['SCORES']['TOTAL'] = filters.minDict([gpcal_scores['SCORES']['TOTAL'], gpcal_scores['SCORES']['XY_TOTAL']], 'SCORE')
else:
gpcal_scores['SCORES']['XY_TOTAL'] = {'FIELD': 'N/A', 'SPW': 'N/A', 'ANTENNA': 'N/A', 'SCORE': 'C/C'}
return gpcal_scores