Source code for pipeline.hif.tasks.antpos.antpos
import csv
import os
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as acaltable
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
__all__ = [
'Antpos',
'AntposInputs',
'AntposResults'
]
LOG = infrastructure.get_logger(__name__)
[docs]class AntposResults(basetask.Results):
def __init__(self, final=[], pool=[], preceding=[], antenna='', offsets=[]):
super(AntposResults, self).__init__()
self.pool = pool[:]
self.final = final[:]
self.preceding = preceding[:]
self.error = set()
self.antenna = antenna
self.offsets = offsets
[docs] def merge_with_context(self, context):
"""
See :method:`~pipeline.api.Results.merge_with_context`
"""
if not self.final:
LOG.warn('No antenna position results to merge')
return
for calapp in self.final:
LOG.debug('Adding calibration to callibrary:\n'
'%s\n%s' % (calapp.calto, calapp.calfrom))
context.callibrary.add(calapp.calto, calapp.calfrom)
def __repr__(self):
s = 'AntposResults:\n'
for calapplication in self.final:
s += '\tBest caltable for spw #{spw} in {vis} is {name}\n'.format(
spw=calapplication.spw, vis=os.path.basename(calapplication.vis),
name=calapplication.gaintable)
return s
[docs]class AntposInputs(vdp.StandardInputs):
"""
AntposInputs defines the inputs for the Antpos pipeline task.
"""
antenna = vdp.VisDependentProperty(default='')
antposfile = vdp.VisDependentProperty(default='antennapos.csv')
hm_antpos = vdp.VisDependentProperty(default='manual')
@vdp.VisDependentProperty
def offsets(self):
return []
@vdp.VisDependentProperty
def caltable(self):
"""
Get the caltable argument for these inputs.
If set to a table-naming heuristic, this should give a sensible name
considering the current CASA task arguments.
"""
namer = acaltable.AntposCaltable()
casa_args = self._get_task_args(ignore=('caltable',))
return namer.calculate(output_dir=self.output_dir, stage=self.context.stage, **casa_args)
def __init__(self, context, output_dir=None, vis=None, caltable=None, hm_antpos=None, antposfile=None, antenna=None,
offsets=None):
super(AntposInputs, self).__init__()
# pipeline inputs
self.context = context
# vis must be set first, as other properties may depend on it
self.vis = vis
self.output_dir = output_dir
# data selection arguments
self.antenna = antenna
self.antposfile = antposfile
self.caltable = caltable
# solution parameters
self.hm_antpos = hm_antpos
self.offsets = offsets
[docs] def to_casa_args(self):
# Get the antenna and offset lists.
if self.hm_antpos == 'manual':
antenna = self.antenna
offsets = self.offsets
elif self.hm_antpos == 'file':
filename = os.path.join(self.output_dir, self.antposfile)
antenna, offsets = self._read_antpos_csvfile(
filename, os.path.basename(self.vis))
else:
antenna = ''
offsets = []
return {'vis': self.vis,
'caltable': self.caltable,
'antenna': antenna,
'parameter': offsets}
def _read_antpos_txtfile(self, filename):
"""
Read and return the contents of a file or list of files.
"""
# If the input is a list of flagging command file names, call this
# function recursively. Otherwise, read in the file and return its
# contents
# FIXME: missing _add_file method
if isinstance(filename, list):
return ''.join([self._add_file(f) for f in filename])
# This assumes a very simple antenna offsets file format
# Blank lines are skipped
# Comment lines start with # and are skipped
# Each line must contain at least 4 white spaced delimited fields
# containing the antenna name, x offset, y offset, and z offset
# Rewrite this when we know the real format
antennas = []
parameters = []
with open(filename, 'r') as datafile:
for line in datafile:
if not line.strip():
continue
if line.startswith('#'):
continue
fields = line.split()
if len(fields) < 4:
continue
antennas.append(fields[0])
parameters.extend(
[float(fields[1]), float(fields[2]), float(fields[3])])
# Convert the list to a string since CASA wants it that way?
return ','.join(antennas), parameters
@staticmethod
def _read_antpos_csvfile(filename, msbasename):
"""
Read and return the contents of a file or list of files.
"""
# This assumes a very simple csv file format containing the following
# columns
# ms
# antenna
# xoffset in meters
# yoffset in meters
# zoffset in meters
# comment
# Rewrite this when we know the real format
antennas = []
parameters = []
if not os.path.exists(filename):
LOG.warn('Antenna position offsets file does not exist')
return ','.join(antennas), parameters
with open(filename, 'rt') as f:
reader = csv.reader(f)
# First row is header row
next(reader)
# Loop over the rows
for row in reader:
(ms_name, ant_name, xoffset, yoffset, zoffset, _) = row
if ms_name != msbasename:
continue
antennas.append(ant_name)
parameters.extend(
[float(xoffset), float(yoffset), float(zoffset)])
# Convert the list to a string since CASA wants it that way?
return ','.join(antennas), parameters
[docs]@task_registry.set_equivalent_casa_task('hif_antpos')
class Antpos(basetask.StandardTaskTemplate):
Inputs = AntposInputs
[docs] def prepare(self):
inputs = self.inputs
gencal_args = inputs.to_casa_args()
gencal_job = casa_tasks.gencal(caltype='antpos', **gencal_args)
if inputs.hm_antpos == 'file' and gencal_args['antenna'] == '':
LOG.info('No antenna position offsets are defined')
else:
self._executor.execute(gencal_job)
calto = callibrary.CalTo(vis=inputs.vis)
# careful now! Calling inputs.caltable mid-task will remove the
# newly-created caltable, so we must look at the task arguments
# instead
calfrom = callibrary.CalFrom(gencal_args['caltable'],
caltype='antpos',
spwmap=[],
interp='', calwt=False)
calapp = callibrary.CalApplication(calto, calfrom)
return AntposResults(pool=[calapp], antenna=gencal_args['antenna'],
offsets=gencal_args['parameter'])
[docs] def analyse(self, result):
# With no best caltable to find, our task is simply to set the one
# caltable as the best result
# double-check that the caltable was actually generated
on_disk = [ca for ca in result.pool
if ca.exists() or self._executor._dry_run]
result.final[:] = on_disk
missing = [ca for ca in result.pool
if ca not in on_disk and not self._executor._dry_run]
result.error.clear()
result.error.update(missing)
return result