"""
Example usage:
inputs = pipeline.vla.tasks.priorcals.Priorcals.Inputs(context)
task = pipeline.vla.tasks.priorcals.Priocals(inputs)
result = task.exectue(dry_run=False)
result.accept(context)
"""
import datetime
import os
import urllib
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.hif.tasks.antpos import Antpos
from pipeline.hifv.tasks.gaincurves import GainCurves
from pipeline.hifv.tasks.opcal import Opcal
from pipeline.hifv.tasks.rqcal import Rqcal
from pipeline.hifv.tasks.swpowcal import Swpowcal
from pipeline.hifv.tasks.tecmaps import TecMaps
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from . import resultobjects
LOG = infrastructure.get_logger(__name__)
[docs]def correct_ant_posns(vis_name, print_offsets=False):
"""
Given an input visibility MS name (vis_name), find the antenna
position offsets that should be applied. This application should
be via the gencal task, using caltype='antpos'.
If the print_offsets parameter is True, will print out each of
the found offsets (or indicate that none were found), otherwise
runs silently.
A list is returned where the first element is the returned error
code, the second element is a string of the antennas, and the
third element is a list of antenna Bx,By,Bz offsets. An example
return list might look like:
[ 0, 'ea01,ea19', [0.0184, -0.0065, 0.005, 0.0365, -0.0435, 0.0543] ]
Usage examples:
CASA <1>: antenna_offsets = correct_ant_posns('test.ms')
CASA <2>: if (antenna_offsets[0] == 0):
CASA <3>: gencal(vis='test.ms', caltable='cal.G', \
caltype='antpos', antenna=antenna_offsets[1], \
parameter=antenna_offsets[2])
This function does NOT work for VLA datasets, only EVLA. If an
attempt is made to use the function for VLA data (prior to 2010),
an error code of 1 is returned.
The offsets are retrieved over the internet. A description and the
ability to manually examine and retrieve offsets is at:
http://www.vla.nrao.edu/astro/archive/baselines/
If the attempt to establish the internet connection fails, an error
code of 2 is returned.
Uses the same algorithm that the AIPS task VLANT does.
bjb
nrao
spring 2012
Update:
BJB
NRAO
Spring 2020 (fixed version)
"""
MONTHS = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN',
'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
URL_BASE = 'http://www.vla.nrao.edu/cgi-bin/evlais_blines.cgi?Year='
#
# get start date+time of observation
#
with casa_tools.TableReader(vis_name+'/OBSERVATION') as table:
time_range = table.getcol('TIME_RANGE')
MJD_start_time = time_range[0][0] / 86400
q1 = casa_tools.quanta.quantity(time_range[0][0], 's')
date_time = casa_tools.quanta.time(q1, form='ymd')
# date_time looks like: '2011/08/10/06:56:49'
[obs_year, obs_month, obs_day, obs_time_string] = date_time[0].split('/')
if int(obs_year) < 2010:
if print_offsets:
LOG.warn('Does not work for VLA observations')
return [1, '', []]
[obs_hour, obs_minute, obs_second] = obs_time_string.split(':')
obs_time = 10000*int(obs_year) + 100*int(obs_month) + int(obs_day) + \
int(obs_hour)/24.0 + int(obs_minute)/1440.0 + \
int(obs_second)/86400.0
#
# get antenna to station mappings
#
with casa_tools.TableReader(vis_name+'/ANTENNA') as table:
ant_names = table.getcol('NAME')
ant_stations = table.getcol('STATION')
ant_num_stas = []
for ii in range(len(ant_names)):
ant_num_stas.append([int(ant_names[ii][2:]), ant_names[ii], ant_stations[ii], 0.0, 0.0, 0.0, False])
correction_lines = []
current_year = datetime.datetime.now().year
# first, see if the internet connection is possible
try:
response = urllib.request.urlopen(URL_BASE + '2010')
except urllib.error.URLError as err:
if print_offsets:
LOG.warn('No internet connection to antenna position correction URL {}'.format(err.reason))
return [2, '', []]
response.close()
for year in range(2010, current_year+1):
response = urllib.request.urlopen(URL_BASE + str(year))
htmlresponse = response.read()
response.close()
html = htmlresponse.decode()
html_lines = html.split('\n')
for correction_line in html_lines:
if len(correction_line) and correction_line[0] != '<' and correction_line[0] != ';':
for month in MONTHS:
if month in correction_line:
correction_lines.append(str(year)+' '+correction_line)
break
corrections_list = []
for correction_line in correction_lines:
correction_line_fields = correction_line.split()
if len(correction_line_fields) > 9:
[c_year, moved_date, obs_date, put_date, put_time_str, ant, pad, Bx, By, Bz] = correction_line_fields
s_moved = moved_date[:3]
i_month = 1
for month in MONTHS:
if moved_date.find(month) >= 0:
break
i_month = i_month + 1
moved_time = 10000 * int(c_year) + 100 * i_month + \
int(moved_date[3:])
else:
[c_year, obs_date, put_date, put_time_str, ant, pad, Bx, By, Bz] = correction_line_fields
moved_date = ' '
moved_time = 0
s_obs = obs_date[:3]
i_month = 1
for month in MONTHS:
if s_obs.find(month) >= 0:
break
i_month = i_month + 1
obs_time_2 = 10000 * int(c_year) + 100 * i_month + int(obs_date[3:])
s_put = put_date[:3]
i_month = 1
for month in MONTHS:
if s_put.find(month) >= 0:
break
i_month = i_month + 1
put_time = 10000 * int(c_year) + 100 * i_month + int(put_date[3:])
[put_hr, put_min] = put_time_str.split(':')
put_time += (int(put_hr)/24.0 + int(put_min)/1440.0)
corrections_list.append([c_year, moved_date, moved_time, obs_date, obs_time_2, put_date, put_time, int(ant),
pad, float(Bx), float(By), float(Bz)])
for correction_list in corrections_list:
[c_year, moved_date, moved_time, obs_date, obs_time_2, put_date, put_time, ant, pad, Bx, By, Bz] = correction_list
ant_ind = -1
for ii in range(len(ant_num_stas)):
if ant_num_stas[ii][0] == ant:
ant_ind = ii
break
# make sure the antenna in this correction is in the observation,
# and is not done
if ant_ind != -1 and not ant_num_stas[ant_ind][6]:
ant_num_sta = ant_num_stas[ant_ind]
if moved_time:
# the antenna moved
if moved_time > obs_time:
# we are done considering this antenna
ant_num_stas[ant_ind][6] = True
else:
# otherwise, it moved, so the offsets should be reset
ant_num_stas[ant_ind][3] = 0.0
ant_num_stas[ant_ind][4] = 0.0
ant_num_stas[ant_ind][5] = 0.0
if put_time > obs_time and not ant_num_stas[ant_ind][6] and pad == ant_num_stas[ant_ind][2]:
# it's the right antenna/pad; add the offsets to those already accumulated
ant_num_stas[ant_ind][3] += Bx
ant_num_stas[ant_ind][4] += By
ant_num_stas[ant_ind][5] += Bz
ants = []
parms = []
for ant_num_sta in ant_num_stas:
if ant_num_sta[3] != 0.0 or ant_num_sta[4] != 0.0 or ant_num_sta[3] != 0.0:
if print_offsets:
LOG.info("Offsets for antenna %4s on pad %3s: %8.5f %8.5f %8.5f" %
(ant_num_sta[1], ant_num_sta[2], ant_num_sta[3], ant_num_sta[4], ant_num_sta[5]))
ants.append(ant_num_sta[1])
parms.append(ant_num_sta[3])
parms.append(ant_num_sta[4])
parms.append(ant_num_sta[5])
if len(parms) == 0 and print_offsets:
LOG.info("No offsets found for this MS")
ant_string = ','.join(["%s" % ii for ii in ants])
return [0, ant_string, parms]
[docs]@task_registry.set_equivalent_casa_task('hifv_priorcals')
class Priorcals(basetask.StandardTaskTemplate):
Inputs = PriorcalsInputs
[docs] def prepare(self):
callist = []
gc_result = self._do_gaincurves()
oc_result = self._do_opcal()
rq_result = self._do_rqcal()
sw_result = self._do_swpowcal()
antpos_result, antcorrect = self._do_antpos()
tecmaps_result = None
if self.inputs.tecmaps:
tecmaps_result = self._do_tecmaps()
#try:
# antpos_result.merge_withcontext(self.inputs.context)
#except:
# LOG.error('No antenna position corrections.')
return resultobjects.PriorcalsResults(pool=callist, gc_result=gc_result,
oc_result=oc_result, rq_result=rq_result,
antpos_result=antpos_result, antcorrect=antcorrect,
tecmaps_result=tecmaps_result, sw_result=sw_result)
[docs] def analyse(self, results):
return results
def _do_gaincurves(self):
"""Run gaincurves task"""
inputs = GainCurves.Inputs(self.inputs.context, vis=self.inputs.vis)
task = GainCurves(inputs)
return self._executor.execute(task)
def _do_opcal(self):
"""Run opcal task"""
inputs = Opcal.Inputs(self.inputs.context, vis=self.inputs.vis)
task = Opcal(inputs)
return self._executor.execute(task)
def _do_rqcal(self):
"""Run requantizer gains task"""
inputs = Rqcal.Inputs(self.inputs.context, vis=self.inputs.vis)
task = Rqcal(inputs)
return self._executor.execute(task)
def _do_swpowcal(self):
"""Run switched power task"""
inputs = Swpowcal.Inputs(self.inputs.context, vis=self.inputs.vis, spw=self.inputs.swpow_spw)
task = Swpowcal(inputs)
return self._executor.execute(task)
def _do_antpos(self):
"""Run hif_antpos to correct for antenna positions"""
inputs = Antpos.Inputs(self.inputs.context, vis=self.inputs.vis)
task = Antpos(inputs)
result = self._executor.execute(task)
antcorrect = {}
try:
antpos_caltable = result.final[0].gaintable
if os.path.exists(antpos_caltable):
LOG.info("Start antenna position corrections")
antparamlist = correct_ant_posns(inputs.vis, print_offsets=False)
LOG.info("End antenna position corrections")
self._check_tropdelay(antpos_caltable)
antList = antparamlist[1].split(',')
N=3
subList = [antparamlist[2][n:n+N] for n in range(0, len(antparamlist[2]), N)]
antcorrect = dict(zip(antList, subList))
except Exception as ex:
LOG.info("No offsets found. No caltable created.")
LOG.debug(ex)
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
fracantcorrect = float(len(antcorrect)) / float(len(m.antennas))
if fracantcorrect > 0.5:
LOG.warn("{:5.2f} percent of antennas needed position corrections.".format(100.0 * fracantcorrect))
return result, antcorrect
def _do_tecmaps(self):
"""Run tec_maps function"""
inputs = TecMaps.Inputs(self.inputs.context, vis=self.inputs.vis, output_dir='')
task = TecMaps(inputs)
return self._executor.execute(task)
def _check_tropdelay(self, antpos_caltable):
# Insert value if required for testing
'''
#print "ADDED TEST TROP VALUE"
trdelscale = 1.23
tb = casa_tools.table()
tb.open(antpos_caltable, nomodify=False)
tb.putkeyword('VLATrDelCorr', trdelscale)
tb.close()
#print "END OF ADDING TEST TROP VALUE"
'''
# Detect EVLA 16B Trop Del Corr
# (Silent if required keyword absent, or has value=0.0)
# antpostable = 'cal.antpos'
trdelkw = 'VLATrDelCorr'
with casa_tools.TableReader(antpos_caltable) as tb:
if tb.keywordnames().count(trdelkw) == 1:
trdelscale = tb.getkeyword(trdelkw)
if trdelscale != 0.0:
warning_message = "NB: This EVLA dataset appears to fall within the period of semester 16B " \
"during which the online tropospheric delay model was mis-applied. " \
"A correction for the online tropospheric delay model error WILL BE APPLIED! " \
"Tropospheric delay error correction coefficient="+str(-trdelscale/1000.0)+ " (ps/m) "
LOG.debug("EVLA 16B Online Trop Del Corr is ON, scale=" + str(trdelscale))
LOG.warn(warning_message)