import math
import numpy as np
import pipeline.infrastructure as infrastructure
from pipeline.h.tasks.common import commonresultobjects
from pipeline.infrastructure import casa_tools
LOG = infrastructure.get_logger(__name__)
[docs]def calculate_qa_numbers(result):
"""Calculate a single number from the qa views stored in result.
result -- The Qa2Result object containing the Qa2 views.
"""
qa_per_view = {}
for description in result.descriptions():
qa_result = result.last(description)
qa_data = qa_result.data
qa_flag = qa_result.flag
# qa score is no-wvr rms / with-wvr rms
qa_per_view[description] = 1.0 / np.median(qa_data[qa_flag == False])
result.view_score = qa_per_view
result.overall_score = np.median(list(qa_per_view.values()))
[docs]def calculate_view(context, nowvrtable, withwvrtable, result, qa_intent):
# get phase rms results for no-wvr case and with-wvr case
nowvr_results = calculate_phase_rms(context, nowvrtable, qa_intent)
wvr_results = calculate_phase_rms(context, withwvrtable, qa_intent)
for k, v in wvr_results.items():
result.vis = v.filename
# the ratio withwvr/nowvr is the view we want
nowvr_data = nowvr_results[k].data
nowvr_flag = nowvr_results[k].flag
wvr_data = wvr_results[k].data
wvr_flag = wvr_results[k].flag
oldseterr = np.seterr(divide='ignore', invalid='ignore')
data = wvr_data / nowvr_data
data_flag = (nowvr_flag | wvr_flag | (nowvr_data == 0))
data_flag[np.isinf(data)] = True
data_flag[np.isnan(data)] = True
np.seterr(**oldseterr)
axes = v.axes
improvement_result = commonresultobjects.ImageResult(
v.filename, data=data,
datatype='with-wvr phase rms / no-wvr phase rms',
axes=axes, flag=data_flag, intent=v.intent, spw=v.spw)
result.addview(improvement_result.description, improvement_result)
[docs]def calculate_phase_rms(context, gaintable, qa_intent):
"""
tsystable -- CalibrationTableData object giving access to the wvrg
caltable.
"""
# raise an exception when np encounters any error
olderr = np.seterr(all='raise')
try:
phase_rms_results = {}
with casa_tools.TableReader(gaintable) as table:
vis = table.getkeyword('MSName')
# access field names via domain object
ms = context.observing_run.get_ms(name=vis)
# for each intent in the QA intent list, create a phase RMS result
intent_list = set(qa_intent.split(','))
for intent in intent_list:
# identify scans corresponding to this intent
intent_scans = [scan.id for scan in ms.scans if intent in scan.intents]
if len(intent_scans) <= 0:
continue
sb = table.query('SCAN_NUMBER in %s' % str(intent_scans))
spectral_window_id = sb.getcol('SPECTRAL_WINDOW_ID')
spwids = sorted(set(spectral_window_id))
antenna1 = sb.getcol('ANTENNA1')
antennas = set(antenna1)
timecol = sb.getcol('TIME')
# Are there any times.
times = sorted(set(timecol))
if len(times) <= 0:
sb.done()
continue
times = np.array(times)
# look for continuously sampled chunks of data. Sometimes
# there are small gaps of 10-15 sec within lengths of data
# that we want to regard as continuous, so make the minimum
# 'gap' larger than this.
time_chunks = findchunks(times, gap_time=30.0)
cparam = sb.getcol('CPARAM')
flag = sb.getcol('FLAG')
rows = np.arange(sb.nrows())
# to eliminate the effect of the refant on the results
# (i.e. phase rms of refant is always 0) we first
# construct 8 versions of the gain table with the entries
# corresponding to 8 different values for the refant.
cparam_refant = {}
flag_refant = {}
refants = list(antennas)[:8]
for refant in refants:
cparam_refant[refant] = np.array(cparam)
flag_refant[refant] = np.array(flag)
for spwid in spwids:
for t in times:
# gains for all antennas at time t
selected_rows = rows[
(spectral_window_id == spwid) & (timecol == t)]
gain = cparam_refant[refant][0, 0, selected_rows]
# gain_flag = flag_refant[refant][0, 0, selected_rows]
# gain for refant at time t
refant_row = rows[(spectral_window_id == spwid)
& (timecol == t)
& (antenna1 == refant)]
if refant_row:
refant_gain = \
cparam_refant[refant][0, 0, refant_row][0]
refant_gain_flag = \
flag_refant[refant][0, 0, refant_row][0]
else:
refant_gain_flag = True
# now modify entries to make refant the reference
if not refant_gain_flag:
# use a.b = ab cos(theta) axb = ab sin(theta) to
# calculate theta
dot_product = \
gain.real * refant_gain.real \
+ gain.imag * refant_gain.imag
cross_product = \
gain.real * refant_gain.imag \
- gain.imag * refant_gain.real
complex_rel_phase = np.zeros([len(dot_product)],
np.complex)
complex_rel_phase.imag = np.arctan2(
cross_product, dot_product)
cparam_refant[refant][0, 0, selected_rows] = \
np.abs(gain) * np.exp(-complex_rel_phase)
else:
flag_refant[refant][0, 0, selected_rows] = True
# now calculate the phase rms views
for spwid in spwids:
data = np.zeros([max(antennas)+1, len(time_chunks)])
data_flag = np.ones([max(antennas)+1, len(time_chunks)],
np.bool)
chunk_base_times = np.zeros([len(time_chunks)])
for antenna in antennas:
selected_rows = rows[(spectral_window_id == spwid)
& (antenna1 == antenna)]
gain_times = timecol[selected_rows]
for i, chunk in enumerate(time_chunks):
chunk_start = times[chunk[0]] - 0.1
chunk_end = times[chunk[-1]] + 0.1
chunk_select = \
(gain_times >= chunk_start)\
& (gain_times <= chunk_end)
if not chunk_select.any():
continue
gain_times_chunk = gain_times[chunk_select]
chunk_base_times[i] = gain_times_chunk[0]
rms_refant = np.zeros([8])
rms_refant_flag = np.ones([8], np.bool)
for refant in refants:
gain = cparam_refant[refant][0, 0, selected_rows]
gain_chunk = gain[chunk_select]
gain_flag = flag_refant[refant][0, 0, selected_rows]
gain_flag_chunk = gain_flag[chunk_select]
try:
valid = np.logical_not(gain_flag_chunk)
if not np.any(valid):
continue
# complex median: median(reals) +
# i*median(imags)
cmedian = complex(
np.median(gain_chunk.real[valid]),
np.median(gain_chunk.imag[valid]))
# use a.b = ab cos(theta) and
# axb = ab sin(theta) to calculate theta
dot_product = \
gain_chunk.real[valid] * cmedian.real\
+ gain_chunk.imag[valid] * cmedian.imag
cross_product = \
gain_chunk.real[valid] * cmedian.imag\
- gain_chunk.imag[valid] * cmedian.real
phases = \
np.arctan2(cross_product, dot_product)\
* 180.0 / math.pi
# calculate phase rms
phases *= phases
phase_rms = np.sum(phases) / float(len(phases))
phase_rms = math.sqrt(phase_rms)
rms_refant[refant] = phase_rms
rms_refant_flag[refant] = False
except:
pass
# set view
valid_data = rms_refant[
np.logical_not(rms_refant_flag)]
if len(valid_data) > 0:
data[antenna, i] = np.median(valid_data)
data_flag[antenna, i] = False
axes = [
commonresultobjects.ResultAxis(
name='Antenna', units='id',
data=np.arange(max(antennas)+1)),
commonresultobjects.ResultAxis(
name='Time', units='', data=chunk_base_times)]
phase_rms_result = commonresultobjects.ImageResult(
filename=vis, data=data, datatype='r.m.s. phase',
axes=axes, flag=data_flag, intent=intent, spw=spwid,
units='degrees')
phase_rms_results[phase_rms_result.description] = \
phase_rms_result
# free sub table and its resources
sb.done()
return phase_rms_results
finally:
np.seterr(**olderr)
[docs]def findchunks(times, gap_time):
"""Return a list of arrays, each containing the indices of a chunk
of data i.e. a sequence of equally spaced measurements separated
from other chunks by larger time gaps.
Keyword arguments:
times -- Numeric array of times at which the measurements
were taken.
gap_time -- Minimum gap that signifies a 'gap'.
"""
difference = times[1:] - times[:-1]
median_diff = np.median(difference)
gap_diff = max(1.5 * median_diff, gap_time)
chunks = []
chunk = [0]
for i in np.arange(len(difference)):
if difference[i] < gap_diff:
chunk.append(i+1)
else:
chunks.append(np.array(chunk))
chunk = [i+1]
chunks.append(np.array(chunk))
return chunks