import os
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as caltable_heuristic
from pipeline.infrastructure import task_registry
from . import jyperkreader
from . import worker
from . import jyperkdbaccess
LOG = infrastructure.get_logger(__name__)
[docs]class SDK2JyCalResults(basetask.Results):
def __init__(self, vis=None, final=[], pool=[], reffile=None, factors={},
all_ok=False, dbstatus=None):
super(SDK2JyCalResults, self).__init__()
self.vis = vis
self.pool = pool[:]
self.final = final[:]
self.error = set()
self.reffile = reffile
self.factors = factors
self.all_ok = all_ok
self.dbstatus = dbstatus
[docs] def merge_with_context(self, context):
if not self.final:
LOG.error('No results to merge')
return
for calapp in self.final:
LOG.debug('Adding calibration to callibrary:\n'
'%s\n%s' % (calapp.calto, calapp.calfrom))
context.callibrary.add(calapp.calto, calapp.calfrom)
# merge k2jy factor to context assing the value as an attribute of MS
for vis, valid_k2jy in self.factors.items():
msobj = context.observing_run.get_ms(name=vis)
msobj.k2jy_factor = {}
for spwid, spw_k2jy in valid_k2jy.items():
for ant, ant_k2jy in spw_k2jy.items():
for pol, pol_k2jy in ant_k2jy.items():
msobj.k2jy_factor[(spwid, ant, pol)] = pol_k2jy
def __repr__(self):
# Format the Tsyscal results.
s = 'SDK2JyCalResults:\n'
for calapplication in self.final:
s += '\tBest caltable for spw #{spw} in {vis} is {name}\n'.format(
spw=calapplication.spw, vis=os.path.basename(calapplication.vis),
name=calapplication.gaintable)
return s
[docs]@task_registry.set_equivalent_casa_task('hsd_k2jycal')
@task_registry.set_casa_commands_comment('The Kelvin to Jy calibration tables are generated.')
class SDK2JyCal(basetask.StandardTaskTemplate):
Inputs = SDK2JyCalInputs
[docs] def prepare(self):
inputs = self.inputs
# obtain Jy/K factors
factors_list = []
reffile = None
# dbstatus represents the response from the DB as well as whether or not
# the task attempted to access the DB
#
# dbstatus = None -- not attempted to access (dbservice=False)
# dbstatus = True -- the DB returned a factor (could be incomplete)
# dbstatus = False -- the DB didn't return a factor
dbstatus = None
if inputs.dbservice is True:
# Try accessing Jy/K DB if dbservice is True
reffile = 'jyperk_query.csv'
factors_list = self._query_factors()
if len(factors_list) > 0:
dbstatus = True
# export factors for future reference
export_jyperk(reffile, factors_list)
else:
dbstatus = False
if (inputs.dbservice is False) or (len(factors_list) == 0):
# Read scaling factor file
reffile = os.path.abspath(os.path.expandvars(os.path.expanduser(inputs.reffile)))
factors_list = self._read_factors(reffile)
LOG.debug('factors_list=%s' % factors_list)
if len(factors_list) == 0:
LOG.error('No scaling factors available')
return SDK2JyCalResults(vis=os.path.basename(inputs.vis), pool=[])
# generate scaling factor dictionary
factors = rearrange_factors_list(factors_list)
callist = []
valid_factors = {}
all_factors_ok = True
# Loop over MS and generate a caltable per MS
k2jycal_inputs = worker.SDK2JyCalWorker.Inputs(inputs.context, inputs.output_dir, inputs.vis,
inputs.caltable, factors)
k2jycal_task = worker.SDK2JyCalWorker(k2jycal_inputs)
k2jycal_result = self._executor.execute(k2jycal_task)
if k2jycal_result.calapp is not None:
callist.append(k2jycal_result.calapp)
valid_factors[k2jycal_result.vis] = k2jycal_result.ms_factors
all_factors_ok &= k2jycal_result.factors_ok
return SDK2JyCalResults(vis=k2jycal_result.vis, pool=callist, reffile=reffile,
factors=valid_factors, all_ok=all_factors_ok,
dbstatus=dbstatus)
[docs] def analyse(self, result):
# With no best caltable to find, our task is simply to set the one
# caltable as the best result
# double-check that the caltable was actually generated
on_disk = [ca for ca in result.pool
if ca.exists() or self._executor._dry_run]
result.final[:] = on_disk
missing = [ca for ca in result.pool
if ca not in on_disk and not self._executor._dry_run]
result.error.clear()
result.error.update(missing)
return result
def _read_factors(self, reffile):
inputs = self.inputs
if not os.path.exists(inputs.reffile):
return []
# read scaling factor list
factors_list = jyperkreader.read(inputs.context, reffile)
return factors_list
def _query_factors(self):
vis = os.path.basename(self.inputs.vis)
# switch implementation class according to endpoint parameter
endpoint = self.inputs.endpoint
if endpoint == 'asdm':
impl = jyperkdbaccess.JyPerKAsdmEndPoint
elif endpoint == 'model-fit':
impl = jyperkdbaccess.JyPerKModelFitEndPoint
elif endpoint == 'interpolation':
impl = jyperkdbaccess.JyPerKInterpolationEndPoint
else:
raise RuntimeError('Invalid endpoint: {}'.format(endpoint))
query = impl(self.inputs.context)
try:
factors_list = query.getJyPerK(vis)
# warn if result is empty
if len(factors_list) == 0:
LOG.warn('{}: Query to Jy/K DB returned empty result. Will fallback to reading CSV file.'.format(vis))
except Exception as e:
LOG.warn('{}: Query to Jy/K DB was failed due to the following error. Will fallback to reading CSV file.'.format(vis))
LOG.warn(str(e))
factors_list = []
return factors_list
[docs]def rearrange_factors_list(factors_list):
"""
Rearrange scaling factor list to dictionary which looks like
{'MS': {'spw': {'Ant': {'pol': factor}}}}
"""
factors = {}
for (vis, ant, spw, pol, _factor) in factors_list:
spwid = int(spw)
factor = float(_factor)
if vis in factors:
if spwid in factors[vis]:
if ant in factors[vis][spwid]:
if pol in factors[vis][spwid][ant]:
LOG.info('There are duplicate rows in reffile, use %s instead of %s for (%s,%s,%s,%s)' %
(factors[vis][spwid][ant][pol], factor, vis, spwid, ant, pol))
factors[vis][spwid][ant][pol] = factor
else:
factors[vis][spwid][ant][pol] = factor
else:
factors[vis][spwid][ant] = {pol: factor}
else:
factors[vis][spwid] = {ant: {pol: factor}}
else:
factors[vis] = {spwid: {ant: {pol: factor}}}
return factors
[docs]def export_jyperk(outfile, factors):
if not os.path.exists(outfile):
# create file with header information
with open(outfile, 'w') as f:
f.write('MS,Antenna,Spwid,Polarization,Factor\n')
with open(outfile, 'a') as f:
for row in factors:
f.write('{}\n'.format(','.join(row)))