import collections
import os
import numpy as np
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.api as api
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.contfilehandler as contfilehandler
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as uvcaltable
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
LOG = infrastructure.get_logger(__name__)
# Fit the continuum in the UV plane using the CASA style
# uvcontfit task written by the pipeline.
# tell the infrastructure to give us mstransformed data when possible by
# registering our preference for imaging measurement sets
api.ImagingMeasurementSetsPreferred.register(UVcontFitInputs)
[docs]@task_registry.set_equivalent_casa_task('hif_uvcontfit')
class UVcontFit(basetask.StandardTaskTemplate):
Inputs = UVcontFitInputs
[docs] def prepare(self):
inputs = self.inputs
# Check for size mitigation errors.
if 'status' in inputs.context.size_mitigation_parameters and \
inputs.context.size_mitigation_parameters['status'] == 'ERROR':
result = UVcontFitResults()
result.mitigation_error = True
return result
# Compute the spw list from the frequency selection string
# This is passed to callibrary
orig_spw = ','.join([spw.split(':')[0] for spw in inputs.spw.split(',')])
calapps = []
spwdict = {}
if not inputs.contfile:
# Simple case frequency selection same for all sources
# Store input string as is done for the continuum file case
spwdict['all'] = inputs.spwstr
# Create the caltable without a source name
caltable = inputs.caltable
# Execute
uvcontfit_args = inputs.to_casa_args(caltable)
uvcontfit_job = casa_tasks.uvcontfit(**uvcontfit_args)
self._executor.execute(uvcontfit_job)
# Create the callibrary object
calto = callibrary.CalTo(vis=inputs.vis, spw=orig_spw, intent=inputs.intent)
calfrom = callibrary.CalFrom(caltable,
caltype='uvcont',
spwmap=[],
interp='',
calwt=False)
calapps.append(callibrary.CalApplication(calto, calfrom))
else:
# Get the continuum ranges
cranges_spwsel = self._get_ranges_spwsel()
# Save the original list of intents
orig_intent = inputs.intent
# Loop over the ranges calling uvcontfit once per source
for sname in cranges_spwsel:
# Translate to field selection
sfields, sintents = self._get_source_fields(sname)
if not sfields:
continue
# Accumulate spw selection string for this source
source_cranges = cranges_spwsel[sname]
spw_cranges = ['%s:%s' % (spw_id, source_cranges[spw_id].split()[0])
for spw_id in source_cranges
if source_cranges[spw_id] not in ['', 'NONE']]
spwstr = ','.join(spw_cranges)
spwstr = spwstr.replace(':ALL', '')
if not spwstr:
continue
# Create the caltable with a source name
# Not ideal because of the way inputs works but ...
caltable = uvcaltable.UVcontCaltable()
caltable = caltable(output_dir=inputs.output_dir, stage=inputs.context.stage, vis=inputs.vis,
source=sname)
spwdict[sname] = spwstr
# Fire off task
inputs.intent = sintents
uvcontfit_args = inputs.to_casa_args(caltable, field=sfields, spw=spwstr, append=False)
uvcontfit_job = casa_tasks.uvcontfit(**uvcontfit_args)
self._executor.execute(uvcontfit_job)
# Create the callibrary object
calto = callibrary.CalTo(vis=inputs.vis, field=sname, spw=orig_spw, intent=inputs.intent)
calfrom = callibrary.CalFrom(caltable,
caltype='uvcont',
spwmap=[],
interp='',
calwt=False)
calapps.append(callibrary.CalApplication(calto, calfrom))
inputs.intent = orig_intent
return UVcontFitResults(spwdict=spwdict, pool=calapps)
[docs] def analyse(self, result):
# With no best caltable to find, our task is simply to set the one
# caltable as the best result
# double-check that the caltable was actually generated
on_disk = [ca for ca in result.pool
if ca.exists() or self._executor._dry_run]
result.final[:] = on_disk
missing = [ca for ca in result.pool
if ca not in on_disk and not self._executor._dry_run]
result.error.clear()
result.error.update(missing)
return result
def _get_ranges_spwsel(self):
inputs = self.inputs
# Read continuum file
contfile_handler = contfilehandler.ContFileHandler(inputs.contfile, warn_nonexist=True)
# Get all the selected fields
all_fields = inputs.ms.get_fields(task_arg=inputs.field)
# Get all the associated sources
all_sources = [f.source for f in all_fields]
all_source_names = list({f.source.name for f in all_fields})
# Collect the merged ranges
# Error checking ?
cranges_spwsel = collections.OrderedDict()
for sname in all_source_names:
source_fields = [s.fields for s in all_sources if s.name == sname][0]
if len(source_fields) > 1:
rep_field_id, rep_field_name = self._get_rep_field(source_fields)
if rep_field_id < 0:
rep_field_id = source_fields[1].id
rep_field_name = source_fields[1].name
else:
rep_field_id = source_fields[0].id
rep_field_name = source_fields[0].name
LOG.info('Representative field for MS %s source %s is field %s with id %d' % (inputs.ms.basename, sname,
rep_field_name, rep_field_id))
cranges_spwsel[sname] = collections.OrderedDict()
for spw_id in [str(spw.id) for spw in inputs.ms.get_spectral_windows(task_arg=inputs.spw)]:
# convert real to virtual spw id
virtual_spw_id = str(inputs.context.observing_run.real2virtual_spw_id (int(spw_id), inputs.ms))
# cranges_spwsel[sname][spw_id], _ = contfile_handler.get_merged_selection(sname, spw_id)
cranges_spwsel[sname][spw_id], _ = contfile_handler.get_merged_selection(sname, virtual_spw_id)
if not cranges_spwsel[sname][spw_id]:
LOG.info('No continuum region detection attempted for MS %s source %s spw %d' % (
inputs.ms.basename, sname, int(spw_id)))
continue
elif cranges_spwsel[sname][spw_id] in ['NONE']:
LOG.warn('Continuum region detection failed for MS %s source %s spw %d' % (
inputs.ms.basename, sname, int(spw_id)))
continue
else:
LOG.info('Input continuum frequency ranges for MS %s and spw %d are %s' % (
inputs.ms.basename, int(spw_id), cranges_spwsel[sname][spw_id]))
try:
# freq_ranges, chan_ranges, aggregate_lsrk_bw = contfile_handler.lsrk_to_topo(
# cranges_spwsel[sname][spw_id], [inputs.vis], [rep_field_id], int(spw_id),
# self.inputs.context.observing_run)
freq_ranges, chan_ranges, aggregate_lsrk_bw = contfile_handler.lsrk_to_topo(
cranges_spwsel[sname][spw_id], [inputs.vis], [rep_field_id], int(virtual_spw_id),
self.inputs.context.observing_run)
LOG.info('Output continuum frequency range for MS %s and spw %d are %s' % (
inputs.ms.basename, int(spw_id), freq_ranges[0]))
LOG.info('Output continuum channel ranges for MS %s and spw %d are %s' % (
inputs.ms.basename, int(spw_id), chan_ranges[0]))
cranges_spwsel[sname][spw_id] = freq_ranges[0]
except:
LOG.info('Output continuum frequency ranges for MS %s and spw %d are %s' % (
inputs.ms.basename, int(spw_id), cranges_spwsel[sname][spw_id]))
return cranges_spwsel
# This code is a duplciate of the imaging heuristics code which
# estimates the phase center of a mosaic.
# This and the imaging code heuristics should be refactored at
# some point.
def _get_rep_field(self, source_fields):
# Initialize
rep_field = -1
rep_field_name = ''
# Get CASA tools
cqa = casa_tools.quanta
cme = casa_tools.measures
# First estimate the phase center
# Get the individual field directions in ICRS coordinates
mdirections = []
for field in source_fields:
phase_dir = cme.measure(field.mdirection, 'ICRS')
mdirections.append(phase_dir)
# Compute offsets from field 0.
xsep = []
ysep = []
for mdirection in mdirections:
pa = cme.posangle(mdirections[0], mdirection)
sep = cme.separation(mdirections[0], mdirection)
xs = cqa.mul(sep, cqa.sin(pa))
ys = cqa.mul(sep, cqa.cos(pa))
xs = cqa.convert(xs, 'arcsec')
ys = cqa.convert(ys, 'arcsec')
xsep.append(cqa.getvalue(xs))
ysep.append(cqa.getvalue(ys))
xsep = np.array(xsep)
ysep = np.array(ysep)
# Estimate the x and y center offsets
xcen = xsep.min() + (xsep.max() - xsep.min()) / 2.0
ycen = ysep.min() + (ysep.max() - ysep.min()) / 2.0
# Initialize phase center
ref = cme.getref(mdirections[0])
md = cme.getvalue(mdirections[0])
m0 = cqa.quantity(md['m0'])
m1 = cqa.quantity(md['m1'])
# Get direction of image centre crudely by adding offset
# of center to ref values of first field.
m0 = cqa.add(m0, cqa.div('%sarcsec' % xcen, cqa.cos(m1)))
m1 = cqa.add(m1, '%sarcsec' % ycen)
if ref == 'ICRS' or ref == 'J2000' or ref == 'B1950':
m0 = cqa.time(m0, prec=10)[0]
else:
m0 = cqa.angle(m0, prec=9)[0]
m1 = cqa.angle(m1, prec=9)[0]
phase_center = '%s %s %s' % (ref, m0, m1)
phase_dir = cme.source(phase_center)
# Then find the field with smallest separation from the phase center
field_ids = [f.id for f in source_fields]
field_names = [f.name for f in source_fields]
separations = [cme.separation(phase_dir, f.mdirection)['value'] for f in source_fields]
index = separations.index(min(separations))
rep_field = field_ids[index]
rep_field_name = field_names[index]
return rep_field, rep_field_name
def _get_source_fields(self, sname):
inputs = self.inputs
# Get fields which match the input selection and
# filter on source name. Use field names for now
# The '''' are a work around
fields = inputs.ms.get_fields(task_arg=inputs.field)
unique_field_names = {f.name for f in fields if (f.name == sname or f.name == '"'+sname+'"')}
field_ids = {f.id for f in fields if (f.name == sname or f.name == '"'+sname+'"')}
# Add proper intent filter
# May not be necessary
field_intents = []
for f in fields:
if f.name == sname or f.name == '"'+sname+'"':
field_intents.extend(list(f.intents))
field_intents = set(inputs.intent.split(',')).intersection(set(field_intents))
# Fields with different intents may have the same name. Check for this
# and return the IDs if necessary
if len(unique_field_names) is len(field_ids):
fieldstr = ','.join(unique_field_names)
intentstr = ','.join(field_intents)
else:
fieldstr = ','.join([str(i) for i in field_ids])
intentstr = ','.join(field_intents)
return fieldstr, intentstr
[docs]class UVcontFitResults(basetask.Results):
def __init__(self, spwdict=None, final=None, pool=None, preceding=None):
if spwdict is None:
spwdict = {}
if final is None:
final = []
if pool is None:
pool = []
if preceding is None:
preceding = []
super(UVcontFitResults, self).__init__()
self.spwdict = spwdict
self.pool = pool[:]
self.final = final[:]
self.preceding = preceding[:]
self.error = set()
self.mitigation_error = False
[docs] def merge_with_context(self, context):
"""
See :method:`~pipeline.api.Results.merge_with_context`
"""
if not self.final:
LOG.warn('No UV continuum results to merge')
return
for calapp in self.final:
LOG.debug('Adding calibration to callibrary:\n'
'%s\n%s' % (calapp.calto, calapp.calfrom))
context.callibrary.add(calapp.calto, calapp.calfrom)
def __repr__(self):
s = 'UVcontFitResults:\n'
for calapplication in self.final:
s += '\tBest caltable for spw #{spw} in {vis} is {name}\n'.format(
spw=calapplication.spw, vis=os.path.basename(calapplication.vis),
name=calapplication.gaintable)
return s