import collections
import copy
import glob
import math
import operator
import os.path
import re
import shutil
import uuid
import numpy as np
from casatasks.private.imagerhelpers.imager_base import PySynthesisImager
from casatasks.private.imagerhelpers.imager_parallel_continuum import PyParallelContSynthesisImager
from casatasks.private.imagerhelpers.input_parameters import ImagerParameters
import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.contfilehandler as contfilehandler
import pipeline.infrastructure.filenamer as filenamer
import pipeline.infrastructure.mpihelpers as mpihelpers
import pipeline.infrastructure.utils as utils
from pipeline.hif.heuristics import mosaicoverlap
from pipeline.infrastructure import casa_tools
LOG = infrastructure.get_logger(__name__)
[docs]class ImageParamsHeuristics(object):
"""
Image parameters heuristics base class. One instance is made per make/editimlist
call. There are subclasses for different imaging modes such as ALMA
or VLASS.
"""
def __deepcopy__(self, memodict=None):
# Save memory by using a shallow copy of the ObservingRun
return self.__class__(
copy.deepcopy(self.vislist),
copy.deepcopy(self.spw),
self.observing_run,
self.imagename_prefix,
copy.deepcopy(self.proj_params),
self.contfile,
self.linesfile,
copy.deepcopy(self.imaging_params)
)
def __init__(self, vislist, spw, observing_run, imagename_prefix='', proj_params=None, contfile=None,
linesfile=None, imaging_params={}):
"""
:param vislist: the list of MS names
:type vislist: list of strings
:param spw: the virtual spw specification (list of virtual spw IDs)
:type spw_name: string or list
"""
self.imaging_mode = 'BASE'
self.observing_run = observing_run
self.imagename_prefix = imagename_prefix
self.proj_params = proj_params
if isinstance(vislist, list):
self.vislist = vislist
else:
self.vislist = [vislist]
self.spw = spw
self.contfile = contfile
self.linesfile = linesfile
self.imaging_params = imaging_params
# split spw into list of spw parameters for 'clean'
spwlist = spw.replace('[', '').replace(']', '').strip()
spwlist = spwlist.split("','")
spwlist[0] = spwlist[0].strip("'")
spwlist[-1] = spwlist[-1].strip("'")
# find all the spwids present in the list
p = re.compile(r"[ ,]+(\d+)")
self.spwids = set()
for spwclean in spwlist:
spwidsclean = p.findall(' %s' % spwclean)
spwidsclean = list(map(int, spwidsclean))
self.spwids.update(spwidsclean)
[docs] def primary_beam_size(self, spwid, intent):
'''Calculate primary beam size in arcsec.'''
cqa = casa_tools.quanta
# get the diameter of the smallest antenna used among all vis sets
antenna_ids = self.antenna_ids(intent)
diameters = []
for vis in self.vislist:
ms = self.observing_run.get_ms(name=vis)
antennas = ms.antennas
for antenna in antennas:
if antenna.id in antenna_ids[os.path.basename(vis)]:
diameters.append(antenna.diameter)
smallest_diameter = np.min(np.array(diameters))
# get spw info from first vis set, assume spws uniform
# across datasets
msname = self.vislist[0]
real_spwid = self.observing_run.virtual2real_spw_id(spwid, self.observing_run.get_ms(msname))
ms = self.observing_run.get_ms(name=msname)
spw = ms.get_spectral_window(real_spwid)
ref_frequency = float(
spw.ref_frequency.to_units(measures.FrequencyUnits.HERTZ))
# use the smallest antenna diameter and the reference frequency
# to estimate the primary beam radius -
# radius of first null in arcsec = 1.22*lambda/D
primary_beam_size = \
1.22 \
* cqa.getvalue(cqa.convert(cqa.constants('c'), 'm/s')) \
/ ref_frequency \
/ smallest_diameter \
* (180.0 * 3600.0 / math.pi)
return primary_beam_size
[docs] def cont_ranges_spwsel(self):
"""Determine spw selection parameters to exclude lines for mfs and cont images."""
# initialize lookup dictionary for all possible source names
cont_ranges_spwsel = {}
all_continuum_spwsel = {}
for ms_ref in self.observing_run.get_measurement_sets():
for source_name in [s.name for s in ms_ref.sources]:
cont_ranges_spwsel[source_name] = {}
all_continuum_spwsel[source_name] = {}
for spwid in self.spwids:
cont_ranges_spwsel[source_name][str(spwid)] = ''
all_continuum_spwsel[source_name][str(spwid)] = False
contfile = self.contfile if self.contfile is not None else ''
linesfile = self.linesfile if self.linesfile is not None else ''
# read and merge continuum regions if contfile exists
if os.path.isfile(contfile):
LOG.info('Using continuum frequency ranges from %s to calculate continuum frequency selections.' % (contfile))
contfile_handler = contfilehandler.ContFileHandler(contfile, warn_nonexist=True)
# Collect the merged the ranges
for field_name in cont_ranges_spwsel:
for spw_id in cont_ranges_spwsel[field_name]:
cont_ranges_spwsel[field_name][spw_id], all_continuum_spwsel[field_name][spw_id] = contfile_handler.get_merged_selection(field_name, spw_id)
# alternatively read and merge line regions and calculate continuum regions
elif os.path.isfile(linesfile):
LOG.info('Using line frequency ranges from %s to calculate continuum frequency selections.' % (linesfile))
p = re.compile('([\d.]*)(~)([\d.]*)(\D*)')
try:
line_regions = p.findall(open(linesfile, 'r').read().replace('\n', '').replace(';', '').replace(' ', ''))
except Exception:
line_regions = []
line_ranges_GHz = []
for line_region in line_regions:
try:
fLow = casa_tools.quanta.convert('%s%s' % (line_region[0], line_region[3]), 'GHz')['value']
fHigh = casa_tools.quanta.convert('%s%s' % (line_region[2], line_region[3]), 'GHz')['value']
line_ranges_GHz.append((fLow, fHigh))
except:
pass
merged_line_ranges_GHz = [r for r in utils.merge_ranges(line_ranges_GHz)]
# get source and spw info from first vis set, assume spws uniform
# across datasets
msname = self.vislist[0]
ms = self.observing_run.get_ms(name=msname)
for spwid in self.spwids:
real_spwid = self.observing_run.virtual2real_spw_id(spwid, self.observing_run.get_ms(msname))
spw = ms.get_spectral_window(real_spwid)
# assemble continuum spw selection
min_frequency = float(spw.min_frequency.to_units(measures.FrequencyUnits.GIGAHERTZ))
max_frequency = float(spw.max_frequency.to_units(measures.FrequencyUnits.GIGAHERTZ))
spw_sel_intervals = utils.spw_intersect([min_frequency, max_frequency], merged_line_ranges_GHz)
spw_selection = ';'.join(['%.10f~%.10fGHz' % (float(spw_sel_interval[0]), float(spw_sel_interval[1])) for spw_sel_interval in spw_sel_intervals])
# Skip selection syntax completely if the whole spw is selected
if (spw_selection == '%.10f~%.10fGHz' % (float(min_frequency), float(max_frequency))):
spw_selection = ''
for source_name in [s.name for s in ms.sources]:
cont_ranges_spwsel[source_name][str(spwid)] = '%s LSRK' % (spw_selection)
return cont_ranges_spwsel, all_continuum_spwsel
[docs] def field_intent_list(self, intent, field):
intent_list = intent.split(',')
field_list = utils.safe_split(field)
field_intent_result = set()
for vis in self.vislist:
ms = self.observing_run.get_ms(name=vis)
fields = ms.fields
if intent.strip() is not '':
for eachintent in intent_list:
re_intent = eachintent.replace('*', '.*')
field_intent_result.update(
[(fld.name, eachintent) for fld in fields if
re.search(pattern=re_intent, string=str(fld.intents))])
if field.strip() is not '':
# remove items from field_intent_result that are not
# consistent with the fields in field_list
temp_result = set()
for eachfield in field_list:
re_field = utils.dequote(eachfield)
temp_result.update([fir for fir in field_intent_result if utils.dequote(fir[0]) == re_field])
field_intent_result = temp_result
else:
if field.strip() is not '':
for f in field_list:
fintents_list = [fld.intents for fld in fields if utils.dequote(fld.name) == utils.dequote(f)]
for fintents in fintents_list:
for fintent in fintents:
field_intent_result.update((f, fintent))
# eliminate redundant copies of field/intent keys that map to the
# same data - to prevent duplicate images being produced
done_vis_scanids = []
# Loop through a reverse sorted list to make sure that the AMPLITUDE
# intent is removed for cases of shared BANDPASS and AMPLITUDE intents
# (PIPE-199). This should be done more explicitly rather than relying
# on the alphabetical order, but as it requires more refactoring, there
# was no time for this before the Cycle 7 deadline.
# TODO: Save duplicate intents for weblog rendering.
for field_intent in sorted(list(field_intent_result), reverse=True):
field = field_intent[0]
intent = field_intent[1]
re_field = utils.dequote(field)
vis_scanids = {}
for vis in self.vislist:
ms = self.observing_run.get_ms(name=vis)
scanids = sorted(
[scan.id for scan in ms.scans
if intent in scan.intents and re_field in [utils.dequote(f.name) for f in scan.fields]])
vis_scanids[vis] = scanids
if vis_scanids in done_vis_scanids:
LOG.info(
'field: %s intent: %s is a duplicate - removing from imlist' %
(field, intent))
field_intent_result.discard(field_intent)
else:
done_vis_scanids.append(vis_scanids)
return field_intent_result
[docs] def get_scanidlist(self, vis, field, intent):
# Use scanids to select data with the specified intent
# Note CASA clean now supports intent selection but leave
# this logic in place and use it to eliminate vis that
# don't contain the requested data.
scanidlist = []
visindexlist = []
for i in range(len(vis)):
allscanids = []
for fieldname in field.split(','):
re_field = utils.dequote(fieldname)
ms = self.observing_run.get_ms(name=vis[i])
scanids = [scan.id for scan in ms.scans if
intent in scan.intents and
((re_field in [utils.dequote(f.name) for f in scan.fields]) or
(re_field in [str(f.id) for f in scan.fields]))]
if scanids == []:
continue
allscanids.extend(scanids)
if allscanids != []:
allscanids = ','.join(map(str, sorted(list(set(allscanids)))))
scanidlist.append(allscanids)
visindexlist.append(i)
return scanidlist, visindexlist
[docs] def largest_primary_beam_size(self, spwspec, intent):
# get the spwids in spwspec
p = re.compile(r"[ ,]+(\d+)")
spwids = p.findall(' %s' % spwspec)
spwids = set(map(int, spwids))
# find largest beam among spws in spwspec
largest_primary_beam_size = 0.0
for spwid in spwids:
largest_primary_beam_size = max(largest_primary_beam_size, self.primary_beam_size(spwid, intent))
return largest_primary_beam_size
[docs] def synthesized_beam(self, field_intent_list, spwspec, robust=0.5, uvtaper=[], pixperbeam=5.0, known_beams={},
force_calc=False, parallel='automatic', shift=False):
"""Calculate synthesized beam for a given field / spw selection."""
qaTool = casa_tools.quanta
# Need to work on a local copy of known_beams to avoid setting the
# method default value inadvertently
local_known_beams = copy.deepcopy(known_beams)
# reset state of imager
casa_tools.imager.done()
# get the spwids in spwspec - imager tool likes these rather than
# a string
p = re.compile(r"[ ,]+(\d+)")
spwids = p.findall(' %s' % spwspec)
spwids = set(map(int, spwids))
# Select only the highest frequency spw to get the smallest beam
# ref_ms = self.observing_run.get_ms(self.vislist[0])
# max_freq = 0.0
# max_freq_spwid = -1
# for spwid in spwids:
# real_spwid = self.observing_run.virtual2real_spw_id(spwid, ref_ms)
# spwid_centre_freq = ref_ms.get_spectral_window(real_spwid).centre_frequency.to_units(measures.FrequencyUnits.HERTZ)
# if spwid_centre_freq > max_freq:
# max_freq = spwid_centre_freq
# max_freq_spwid = spwid
# spwids = [max_freq_spwid]
# find largest primary beam size among spws in spwspec
if list(field_intent_list) != []:
largest_primary_beam_size = self.largest_primary_beam_size(spwspec, list(field_intent_list)[0][1])
else:
largest_primary_beam_size = self.largest_primary_beam_size(spwspec, 'TARGET')
# put code in try-block to ensure that imager.done gets
# called at the end
valid_data = {}
makepsf_beams = []
try:
for field, intent in field_intent_list:
try:
if force_calc:
# Reset flag to avoid clearing the dictionary several times
force_calc = False
local_known_beams = {}
LOG.info('Got manual request to recalculate beams.')
raise Exception('Got manual request to recalculate beams.')
if local_known_beams['robust'] != robust:
if local_known_beams['robust'] is None:
logMsg = 'robust value changed (old: None, new: %.1f). Re-calculating beams.' % (robust)
else:
logMsg = 'robust value changed (old: %.1f, new: %.1f). Re-calculating beams.' % (local_known_beams['robust'], robust)
LOG.info(logMsg)
local_known_beams = {}
raise Exception(logMsg)
if local_known_beams['uvtaper'] != uvtaper:
LOG.info('uvtaper value changed (old: %s, new: %s). Re-calculating beams.' % (str(local_known_beams['uvtaper']), str(uvtaper)))
local_known_beams = {}
raise Exception('uvtaper value changed (old: %s, new: %s). Re-calculating beams.' % (str(local_known_beams['uvtaper']), str(uvtaper)))
makepsf_beam = local_known_beams[field][intent][','.join(map(str, sorted(spwids)))]['beam']
LOG.info('Using previously calculated beam of %s for Field %s Intent %s SPW %s' %
(str(makepsf_beam), field, intent, ','.join(map(str, sorted(spwids)))))
if makepsf_beam != 'invalid':
makepsf_beams.append(makepsf_beam)
except Exception:
local_known_beams['recalc'] = True
local_known_beams['robust'] = robust
local_known_beams['uvtaper'] = uvtaper
valid_data[(field, intent)] = False
valid_vis_list = []
valid_scanids_list = []
valid_real_spwid_list = []
valid_virtual_spwid_list = []
valid_antenna_ids_list = []
# select data to be imaged
for vis in self.vislist:
antenna_ids = self.antenna_ids(intent, [os.path.basename(vis)])
valid_data_for_vis = False
valid_real_spwid_list_for_vis = []
valid_virtual_spwid_list_for_vis = []
ms = self.observing_run.get_ms(name=vis)
scan_dos = [scan for scan in ms.scans
if intent in scan.intents
and field in {f.name for f in scan.fields}]
scanids = ','.join({str(scan.id) for scan in scan_dos})
for spwid in spwids:
real_spwid = self.observing_run.virtual2real_spw_id(spwid, ms)
# continue if the spw was not used for these scans
if not [scan for scan in scan_dos if real_spwid in {spw.id for spw in scan.spws}]:
continue
try:
taql = '||'.join(['ANTENNA1==%d' % i for i in antenna_ids[os.path.basename(vis)]])
rtn = casa_tools.imager.selectvis(vis=vis,
field=field, spw=real_spwid, scan=scanids,
taql=taql, usescratch=False, writeaccess=False)
if rtn is True:
# flag to say that imager has some valid data to work
# on
valid_data_for_vis = True
valid_data[(field, intent)] = True
valid_real_spwid_list_for_vis.append(real_spwid)
valid_virtual_spwid_list_for_vis.append(spwid)
except:
pass
if valid_data_for_vis:
valid_vis_list.append(vis)
valid_scanids_list.append(scanids)
valid_real_spwid_list.append(','.join(map(str, valid_real_spwid_list_for_vis)))
valid_virtual_spwid_list.append(','.join(map(str, valid_virtual_spwid_list_for_vis)))
valid_antenna_ids_list.append(','.join(map(str, antenna_ids[os.path.basename(vis)])))
if not valid_data[(field, intent)]:
# no point carrying on for this field/intent
LOG.debug('No data for field %s' % (field))
utils.set_nested_dict(local_known_beams,
(field, intent, ','.join(map(str, sorted(spwids))), 'beam'),
'invalid')
continue
# use imager.advise to get the maximum cell size
aipsfieldofview = '%4.1farcsec' % (2.0 * largest_primary_beam_size)
rtn = casa_tools.imager.advise(takeadvice=False, amplitudeloss=0.5, fieldofview=aipsfieldofview)
casa_tools.imager.done()
if not rtn[0]:
# advise can fail if all selected data are flagged
# - not documented but assuming bool in first field of returned
# record indicates success or failure
LOG.warning('imager.advise failed for field/intent %s/%s spw %s - no valid data?'
% (field, intent, spwid))
valid_data[(field, intent)] = False
utils.set_nested_dict(local_known_beams,
(field, intent, ','.join(map(str, sorted(spwids))), 'beam'),
'invalid')
else:
cellv = qaTool.convert(rtn[2], 'arcsec')['value']
cellu = 'arcsec'
cellv /= (0.5 * pixperbeam)
# Now get better estimate from makePSF
tmp_psf_filename = str(uuid.uuid4())
gridder = self.gridder(intent, field)
mosweight = self.mosweight(intent, field)
field_ids = self.field(intent, field, vislist=valid_vis_list)
# Get single field imsize
imsize_sf = self.imsize(fields=field_ids, cell=['%.2g%s' % (cellv, cellu)], primary_beam=largest_primary_beam_size, centreonly=True, vislist=valid_vis_list)
# If it is a mosaic, adjust the size to be somewhat larger than one PB, but not the full
# mosaic size to limit the makePSF run times. The resulting beam is still very close to
# what tclean calculates later in the full mosaic size cleaning.
if gridder == 'mosaic':
imsize_mosaic = self.imsize(fields=field_ids, cell=['%.2g%s' % (cellv, cellu)], primary_beam=largest_primary_beam_size, vislist=valid_vis_list)
nxpix_sf, nypix_sf = imsize_sf
nxpix_mosaic, nypix_mosaic = imsize_mosaic
if nxpix_mosaic <= 2.0 * nxpix_sf and nypix_mosaic <= 2.0 * nypix_sf:
imsize = imsize_mosaic
else:
suTool = casa_tools.synthesisutils
nxpix = suTool.getOptimumSize(int(2.0 * nxpix_sf))
nypix = suTool.getOptimumSize(int(2.0 * nypix_sf))
suTool.done()
imsize = [nxpix, nypix]
else:
imsize = imsize_sf
if self.is_eph_obj(field):
phasecenter = 'TRACKFIELD'
else:
phasecenter = self.phasecenter(field_ids, vislist=valid_vis_list, shift_to_nearest_field=shift,
primary_beam=largest_primary_beam_size, intent=intent)
do_parallel = mpihelpers.parse_mpi_input_parameter(parallel)
paramList = ImagerParameters(msname=valid_vis_list,
scan=valid_scanids_list,
antenna=valid_antenna_ids_list,
spw=valid_real_spwid_list,
field=field,
phasecenter=phasecenter,
imagename=tmp_psf_filename,
imsize=imsize,
cell='%.2g%s' % (cellv, cellu),
gridder=gridder,
mosweight=mosweight,
weighting='briggs',
robust=robust,
uvtaper=uvtaper,
specmode='mfs',
conjbeams=False,
psterm=False,
mterm=False,
dopbcorr=False,
parallel=do_parallel
)
if do_parallel:
makepsf_imager = PyParallelContSynthesisImager(params=paramList)
else:
makepsf_imager = PySynthesisImager(params=paramList)
makepsf_imager.initializeImagers()
makepsf_imager.initializeNormalizers()
makepsf_imager.setWeighting()
makepsf_imager.makePSF()
makepsf_imager.deleteTools()
with casa_tools.ImageReader('%s.psf' % (tmp_psf_filename)) as image:
# Avoid bad PSFs
if all(qaTool.getvalue(qaTool.convert(image.restoringbeam()['minor'], 'arcsec')) > 1e-5):
restoringbeam = image.restoringbeam()
# CAS-11193: Round to 3 digits to avoid confusion when comparing
# heuristics against the beam weblog display (also using 3 digits)
restoringbeam_major_rounded = float('%.3g' % (qaTool.getvalue(qaTool.convert(restoringbeam['major'], 'arcsec'))))
restoringbeam_minor_rounded = float('%.3g' % (qaTool.getvalue(qaTool.convert(restoringbeam['minor'], 'arcsec'))))
restoringbeam_rounded = {'major': {'value': restoringbeam_major_rounded, 'unit': 'arcsec'},
'minor': {'value': restoringbeam_minor_rounded, 'unit': 'arcsec'},
'positionangle': restoringbeam['positionangle']}
makepsf_beams.append(restoringbeam_rounded)
utils.set_nested_dict(local_known_beams,
(field, intent, ','.join(map(str, sorted(spwids))), 'beam'),
restoringbeam_rounded)
else:
utils.set_nested_dict(local_known_beams,
(field, intent, ','.join(map(str, sorted(spwids))), 'beam'),
'invalid')
tmp_psf_images = glob.glob('%s.*' % tmp_psf_filename)
for tmp_psf_image in tmp_psf_images:
shutil.rmtree(tmp_psf_image)
finally:
casa_tools.imager.done()
if makepsf_beams != []:
# beam that's good for all field/intents
smallest_beam = {'minor': '1e9arcsec', 'major': '1e9arcsec', 'positionangle': '0.0deg'}
for beam in makepsf_beams:
bmin_v = qaTool.getvalue(qaTool.convert(beam['minor'], 'arcsec'))
if bmin_v < qaTool.getvalue(qaTool.convert(smallest_beam['minor'], 'arcsec')):
smallest_beam = beam
else:
smallest_beam = 'invalid'
return smallest_beam, copy.deepcopy(local_known_beams)
[docs] def cell(self, beam, pixperbeam=5.0):
"""Calculate cell size."""
cqa = casa_tools.quanta
try:
cell_size = cqa.getvalue(cqa.convert(beam['minor'], 'arcsec')) / pixperbeam
return ['%.2garcsec' % (cell_size)]
except:
return ['invalid']
[docs] def nchan_and_width(self, field_intent, spwspec):
if field_intent == 'TARGET':
# get the spwids in spwspec
p = re.compile(r"[ ,]+(\d+)")
spwids = p.findall(' %s' % spwspec)
spwids = list(set(map(int, spwids)))
# Use the first spw for the time being. TBD if this needs to be improved.
msname = self.vislist[0]
real_spwid = self.observing_run.virtual2real_spw_id(spwids[0], self.observing_run.get_ms(msname))
ms = self.observing_run.get_ms(name=msname)
spw = ms.get_spectral_window(real_spwid)
bandwidth = spw.bandwidth
chan_widths = [c.getWidth() for c in spw.channels]
# Currently imaging with a channel width of 15 MHz or the native
# width if larger and at least 8 channels
image_chan_width = measures.Frequency('15e6', measures.FrequencyUnits.HERTZ)
min_nchan = 8
if (any([chan_width >= image_chan_width for chan_width in chan_widths])):
nchan = -1
width = ''
else:
if (bandwidth >= min_nchan * image_chan_width):
nchan = int(utils.round_half_up(float(bandwidth.to_units(measures.FrequencyUnits.HERTZ)) /
float(image_chan_width.to_units(measures.FrequencyUnits.HERTZ))))
width = str(image_chan_width)
else:
nchan = min_nchan
width = str(bandwidth / nchan)
else:
nchan = -1
width = ''
return nchan, width
[docs] def has_data(self, field_intent_list, spwspec, vislist=None):
if vislist is None:
vislist = self.vislist
# reset state of imager
casa_tools.imager.done()
# put code in try-block to ensure that imager.done gets
# called at the end
valid_data = {}
try:
# select data to be imaged
for field_intent in field_intent_list:
valid_data[field_intent] = False
for vis in vislist:
ms = self.observing_run.get_ms(name=vis)
scanids = [str(scan.id) for scan in ms.scans if
field_intent[1] in scan.intents and
field_intent[0] in [fld.name for fld in scan.fields]]
if scanids != []:
scanids = ','.join(scanids)
real_spwspec = ','.join([str(self.observing_run.virtual2real_spw_id(spwid, ms)) for spwid in spwspec.split(',')])
try:
antenna_ids = self.antenna_ids(field_intent[1], [os.path.basename(vis)])
taql = '||'.join(['ANTENNA1==%d' % i for i in antenna_ids[os.path.basename(vis)]])
rtn = casa_tools.imager.selectvis(vis=vis, field=field_intent[0],
taql=taql, spw=real_spwspec,
scan=scanids, usescratch=False, writeaccess=False)
if rtn is True:
aipsfieldofview = '%4.1farcsec' % (2.0 * self.largest_primary_beam_size(spwspec, field_intent[1]))
# Need to run advise to check if the current selection is completely flagged
rtn = casa_tools.imager.advise(takeadvice=False, amplitudeloss=0.5,
fieldofview=aipsfieldofview)
casa_tools.imager.done()
if rtn[0]:
valid_data[field_intent] = True
except:
pass
if not valid_data[field_intent]:
LOG.debug('No data for SpW %s field %s' %
(spwspec, field_intent[0]))
finally:
casa_tools.imager.done()
return valid_data
[docs] def gridder(self, intent, field):
# the field heuristic which decides whether this is a mosaic or not
# and sets self._mosaic (a bit convoluted...)
self.field(intent, field)
# also need to use mosaic gridder when gridding antennas with different
# diameters (only for calibrators)
if self._mosaic or (len(self.antenna_diameters()) > 1 and intent != 'TARGET'):
# Setting this here because it is used in other places in the heuristics
# TODO: this is flaky since it requires "gridder" to be called before
# other methods using self._mosaic
self._mosaic = True
return 'mosaic'
else:
return 'standard'
[docs] def phasecenter(self, fields, centreonly=True, vislist=None, shift_to_nearest_field=False,
primary_beam=None, intent='TARGET'):
cme = casa_tools.measures
cqa = casa_tools.quanta
if vislist is None:
vislist = self.vislist
mdirections = []
sdirections = []
field_names = []
for ivis, vis in enumerate(vislist):
# Get the visibilities
ms = self.observing_run.get_ms(name=vis)
visfields = fields[ivis]
if visfields == '':
continue
visfields = visfields.split(',')
visfields = list(map(int, visfields))
# Get the phase directions and convert to ICRS
for field in visfields:
# get field centres as measures
fieldobj = ms.get_fields(field_id=field)[0]
ref = cme.getref(fieldobj.mdirection)
if ref == 'ICRS' or ref == 'J2000' or ref == 'B1950':
phase_dir = cme.measure(fieldobj.mdirection, 'ICRS')
else:
phase_dir = fieldobj.mdirection
mdirections.append(phase_dir)
# Store source direction for ephemeris objects
# for later correction of mosaic center position.
if self.is_eph_obj(fieldobj.name):
ref = cme.getref(fieldobj.source.direction)
if ref == 'ICRS' or ref == 'J2000' or ref == 'B1950':
phase_dir = cme.measure(fieldobj.source.direction, 'ICRS')
else:
phase_dir = fieldobj.source.direction
sdirections.append(phase_dir)
field_names.append(fieldobj.name)
# Correct field directions for ephemeris source motion
if sdirections != []:
sdirection_m0_rad = []
sdirection_m1_rad = []
for sdirection in sdirections:
sdirection_m0_rad.append(cqa.getvalue(cqa.convert(sdirection['m0'], 'rad')))
sdirection_m1_rad.append(cqa.getvalue(cqa.convert(sdirection['m1'], 'rad')))
avg_sdirection_m0_rad = np.average(sdirection_m0_rad)
avg_sdirection_m1_rad = np.average(sdirection_m1_rad)
for i in range(len(mdirections)):
mdirections[i]['m0'] = cqa.sub(mdirections[i]['m0'], cqa.sub(sdirections[i]['m0'], cqa.quantity(avg_sdirection_m0_rad, 'rad')))
mdirections[i]['m1'] = cqa.sub(mdirections[i]['m1'], cqa.sub(sdirections[i]['m1'], cqa.quantity(avg_sdirection_m1_rad, 'rad')))
# sanity check - for single field images the field centres from
# different measurement sets should be coincident and can
# collapse the list
# Set the maximum separation to 200 microarcsec and test via
# a tolerance rather than an equality.
max_separation = cqa.quantity('200uarcsec')
if not self._mosaic:
max_separation_uarcsec = cqa.getvalue(cqa.convert(max_separation, 'uarcsec'))[0] # in micro arcsec
for mdirection in mdirections:
separation = cme.separation(mdirection, mdirections[0])
if cqa.gt(separation, max_separation):
separation_arcsec = cqa.getvalue(cqa.convert(separation, 'arcsec'))[0]
LOG.warning('The separation between %s field centers across EBs is %f arcseconds (larger than the limit of %.1f microarcseconds). This is only normal for an ephemeris source or a source with a large proper motion or parallax.' % (field_names[0], separation_arcsec, max_separation_uarcsec))
mdirections = [mdirections[0]]
# it should be easy to calculate some 'average' direction
# from the contributing fields but it doesn't seem to be
# at the moment - no conversion between direction measures,
# no calculation of a direction from a direction and an
# offset. Consequently, what follows is a bit crude.
# First, find the offset of all field from field 0.
xsep = []
ysep = []
for mdirection in mdirections:
pa = cme.posangle(mdirections[0], mdirection)
sep = cme.separation(mdirections[0], mdirection)
xs = cqa.mul(sep, cqa.sin(pa))
ys = cqa.mul(sep, cqa.cos(pa))
xs = cqa.convert(xs, 'arcsec')
ys = cqa.convert(ys, 'arcsec')
xsep.append(cqa.getvalue(xs))
ysep.append(cqa.getvalue(ys))
xsep = np.array(xsep)
ysep = np.array(ysep)
xspread = xsep.max() - xsep.min()
yspread = ysep.max() - ysep.min()
xcen = xsep.min() + xspread / 2.0
ycen = ysep.min() + yspread / 2.0
# get direction of image centre crudely by adding offset
# of centre to ref values of first field.
ref = cme.getref(mdirections[0])
md = cme.getvalue(mdirections[0])
m0 = cqa.quantity(md['m0'])
m1 = cqa.quantity(md['m1'])
m0 = cqa.add(m0, cqa.div('%sarcsec' % xcen, cqa.cos(m1)))
m1 = cqa.add(m1, '%sarcsec' % ycen)
# convert to strings (CASA 4.0 returns as list for some reason
# hence 0 index)
if ref == 'ICRS' or ref == 'J2000' or ref == 'B1950':
m0 = cqa.time(m0, prec=10)[0]
else:
m0 = cqa.angle(m0, prec=9)[0]
m1 = cqa.angle(m1, prec=9)[0]
center = cme.direction(ref, m0, m1)
phase_center = '%s %s %s' % (ref, m0, m1)
# if the image center is outside of the mosaic pointings, shift to the nearest field
if shift_to_nearest_field:
nearest_field_to_center = self.center_field_ids(vislist, field_names[0], intent, phase_center)[0]
ms = self.observing_run.get_ms(name=vislist[0])
nearest = ms.get_fields(field_id=nearest_field_to_center)[0].mdirection
if primary_beam:
pb_dist = 0.408
if cqa.getvalue(cqa.convert(cme.separation(center, nearest), 'arcsec'))[0] > pb_dist * primary_beam:
LOG.info('The nearest pointing is > {pb_dist}pb away from image center. '
'Shifting the phase center to the '
'nearest field (id = {nf})'.format(pb_dist=pb_dist, nf=nearest_field_to_center))
LOG.info('Old phasecenter: {}'.format(phase_center))
# convert to strings (CASA 4.0 returns as list for some reason hence 0 index)
m0 = cme.getvalue(nearest)['m0']
m1 = cme.getvalue(nearest)['m1']
if ref == 'ICRS' or ref == 'J2000' or ref == 'B1950':
m0 = cqa.time(m0, prec=10)[0]
else:
m0 = cqa.angle(m0, prec=9)[0]
m1 = cqa.angle(m1, prec=9)[0]
phase_center = '%s %s %s' % (ref, m0, m1)
LOG.info('New phasecenter: {}'.format(phase_center))
LOG.warning('Source {src} is an odd-shaped mosaic -- there is no mosaic field at the image '
'phasecenter and imaging is likely to fail. The phasecenter of nearest pointing to '
'the image center is {pc}'.format(src=field_names[0], pc=phase_center))
else:
LOG.warning('No primary beam supplied. Will not attempt to shift phasecenter to '
'nearest field w/o a primary beam distance check.')
if centreonly:
return phase_center
else:
return phase_center, xspread, yspread
[docs] def field(self, intent, field, exclude_intent=None, vislist=None):
if vislist is None:
vislist = self.vislist
result = []
nfields_list = []
for vis in vislist:
ms = self.observing_run.get_ms(name=vis)
fields = ms.fields
field_list = []
if field is not None:
# field set explicitly
if not isinstance(field, list):
field_names = [field]
else:
field_names = field
# convert to field ids
for field_name in field_names:
field_list += [fld.id for fld in fields if
field_name.replace(' ', '') == fld.name.replace(' ', '')]
if intent is not None:
# pattern matching to allow intents of form *TARGET* to work
re_intent = intent.replace('*', '.*')
if exclude_intent is not None:
re_exclude_intent = exclude_intent.replace('*', '.*')
field_list = [fld.id for fld in fields if
fld.id in field_list and re_intent in fld.intents and re_exclude_intent not in fld.intents]
else:
field_list = [fld.id for fld in fields if
fld.id in field_list and re_intent in fld.intents]
nfields_list.append(len([fld.id for fld in fields if fld.id in field_list and re_intent in fld.intents]))
field_string = ','.join(str(fld_id) for fld_id in field_list)
result.append(field_string)
# this will be a mosaic if there is more than 1 field_id for any
# measurement set
self._mosaic = (np.array(nfields_list) > 1).any()
return result
[docs] def is_eph_obj(self, field):
ms = None
for msname in self.vislist:
ms = self.observing_run.get_ms(msname)
if field in [f.name for f in ms.fields]:
break
if ms is None:
LOG.error('Image Heuristics Ephemeris Object Check: Field %s not found.' % (field))
raise Exception('Image Heuristics Ephemeris Object Check: Field %s not found.' % (field))
else:
source_name = ms.get_fields(field)[0].source.name
is_eph_obj = False
for source in ms.sources:
if (source.name == source_name) and (source.is_eph_obj):
is_eph_obj = True
return is_eph_obj
[docs] def aggregate_bandwidth(self, spwids=None):
if spwids is None:
local_spwids = self.spwids
else:
local_spwids = spwids
msname = self.vislist[0]
ms = self.observing_run.get_ms(name=msname)
spw_frequency_ranges = []
for spwid in local_spwids:
real_spwid = self.observing_run.virtual2real_spw_id(spwid, ms)
spw = ms.get_spectral_window(real_spwid)
min_frequency_Hz = float(spw.min_frequency.convert_to(measures.FrequencyUnits.HERTZ).value)
max_frequency_Hz = float(spw.max_frequency.convert_to(measures.FrequencyUnits.HERTZ).value)
spw_frequency_ranges.append([min_frequency_Hz, max_frequency_Hz])
aggregate_bandwidth_Hz = np.sum([r[1] - r[0] for r in utils.merge_ranges(spw_frequency_ranges)])
return {'value': aggregate_bandwidth_Hz, 'unit': 'Hz'}
[docs] def representative_target(self):
cqa = casa_tools.quanta
repr_ms = self.observing_run.get_ms(self.vislist[0])
repr_target = repr_ms.representative_target
reprBW_mode = 'nbin'
if repr_target != (None, None, None) and repr_target[0] != 'none':
real_repr_target = True
repr_freq = repr_target[1]
# Get representative source and spw
repr_source, repr_spw = repr_ms.get_representative_source_spw()
# Check if representative bandwidth is larger than spw bandwidth. If so, switch to fullcont.
repr_spw_obj = repr_ms.get_spectral_window(repr_spw)
repr_spw_bw = cqa.quantity(float(repr_spw_obj.bandwidth.convert_to(measures.FrequencyUnits.HERTZ).value), 'Hz')
cont_spw_ids = list(self.observing_run.virtual_science_spw_ids.keys())
agg_bw = self.aggregate_bandwidth(cont_spw_ids)
if cqa.gt(repr_target[2], cqa.mul(agg_bw, 0.9)):
reprBW_mode = 'all_spw'
elif cqa.gt(repr_target[2], repr_spw_bw) and cqa.le(repr_target[2], cqa.mul(agg_bw, 0.9)):
reprBW_mode = 'multi_spw'
elif cqa.gt(repr_target[2], cqa.mul(repr_spw_bw, 0.2)):
reprBW_mode = 'repr_spw'
science_goals = self.observing_run.get_measurement_sets()[0].science_goals
# Check if there is a non-zero min/max angular resolution
minAcceptableAngResolution = cqa.convert(self.proj_params.min_angular_resolution, 'arcsec')
maxAcceptableAngResolution = cqa.convert(self.proj_params.max_angular_resolution, 'arcsec')
if cqa.getvalue(minAcceptableAngResolution) == 0.0 or cqa.getvalue(maxAcceptableAngResolution) == 0.0:
desired_angular_resolution = cqa.convert(self.proj_params.desired_angular_resolution, 'arcsec')
if cqa.getvalue(desired_angular_resolution) != 0.0:
minAcceptableAngResolution = cqa.mul(desired_angular_resolution, 0.8)
maxAcceptableAngResolution = cqa.mul(desired_angular_resolution, 1.2)
else:
minAcceptableAngResolution = cqa.convert(science_goals['minAcceptableAngResolution'], 'arcsec')
maxAcceptableAngResolution = cqa.convert(science_goals['maxAcceptableAngResolution'], 'arcsec')
maxAllowedBeamAxialRatio = science_goals['maxAllowedBeamAxialRatio']
# Check if there is a non-zero sensitivity goal
sensitivityGoal = cqa.convert(self.proj_params.desired_sensitivity, 'mJy')
if cqa.getvalue(sensitivityGoal) == 0.0:
sensitivityGoal = cqa.convert(science_goals['sensitivity'], 'mJy')
else:
real_repr_target = False
# Fall back to existing source
target_sources = [s.name for s in repr_ms.sources if 'TARGET' in s.intents]
if target_sources == []:
if repr_target[0] == 'none':
LOG.warning('No TARGET intent found. Trying to select a representative source from calibration intents.')
for repr_intent in ['PHASE', 'CHECK', 'AMPLITUDE', 'FLUX', 'BANDPASS']:
target_sources = [s.name for s in repr_ms.sources if repr_intent in s.intents]
if target_sources != []:
break
if target_sources == []:
raise Exception('Cannot select any representative target from calibration intents.')
else:
raise Exception('Cannot select any representative target from TARGET intent.')
repr_source = target_sources[0]
repr_spw_obj = repr_ms.get_spectral_windows()[0]
repr_spw = repr_spw_obj.id
repr_chan_obj = repr_spw_obj.channels[int(repr_spw_obj.num_channels // 2)]
repr_freq = cqa.quantity(float(repr_chan_obj.getCentreFrequency().convert_to(measures.FrequencyUnits.HERTZ).value), 'Hz')
repr_bw = cqa.quantity(float(repr_chan_obj.getWidth().convert_to(measures.FrequencyUnits.HERTZ).value), 'Hz')
repr_target = (repr_source, repr_freq, repr_bw)
minAcceptableAngResolution = '0.0arcsec'
maxAcceptableAngResolution = '0.0arcsec'
maxAllowedBeamAxialRatio = '0.0'
sensitivityGoal = '0.0mJy'
LOG.info('Image Heuristics: No representative target found. Choosing %s SPW %d.' % (repr_source, repr_spw))
virtual_repr_spw = self.observing_run.real2virtual_spw_id(repr_spw, repr_ms)
return repr_target, repr_source, virtual_repr_spw, repr_freq, reprBW_mode, real_repr_target, minAcceptableAngResolution, maxAcceptableAngResolution, maxAllowedBeamAxialRatio, sensitivityGoal
[docs] def imsize(self, fields, cell, primary_beam, sfpblimit=None, max_pixels=None,
centreonly=False, vislist=None, spwspec=None):
"""
Image size heuristics for single fields and mosaics. The pixel count along x and y image dimensions
is determined by the cell size, primary beam size and the spread of phase centers in case of mosaics.
:param fields: list of comma separated strings of field IDs per MS.
:param cell: pixel (cell) size in arcsec.
:param primary_beam: primary beam width in arcsec.
:param sfpblimit: single field primary beam response. If provided then imsize is chosen such that the image
edge is at normalised primary beam level equals to sfpblimit.
:param max_pixels: maximum allowed pixel count, integer. The same limit is applied along both image axes.
:param centreonly: if True, then ignore the spread of field centers.
:param vislist: list of visibility path string to be used for imaging. If not set then use all visibilities
in the context.
:param spwspec: ID list of spectral windows used to create image product. List or string containing comma
separated spw IDs list. Not used in the base method.
:return: two element list of pixel count along x and y image axes.
"""
if vislist is None:
vislist = self.vislist
# get spread of beams
if centreonly:
xspread = yspread = 0.0
else:
ignore, xspread, yspread = self.phasecenter(fields, centreonly=centreonly, vislist=vislist)
cqa = casa_tools.quanta
cellx = cell[0]
if len(cell) > 1:
celly = cell[1]
else:
celly = cell[0]
if (cellx == 'invalid') or (celly == 'invalid'):
return [0, 0]
# get cell and beam sizes in arcsec
cellx_v = cqa.getvalue(cqa.convert(cellx, 'arcsec'))
celly_v = cqa.getvalue(cqa.convert(celly, 'arcsec'))
beam_radius_v = primary_beam
# set size of image to spread of field centres plus a
# border of 0.75 (0.825) * beam radius (radius is to
# first null) wide
nfields = int(np.median([len(field_ids.split(',')) for field_ids in fields]))
if self._mosaic and nfields <= 3:
# PIPE-209 asks for a slightly larger size for small (2-3 field) mosaics.
nxpix = int((1.65 * beam_radius_v + xspread) / cellx_v)
nypix = int((1.65 * beam_radius_v + yspread) / celly_v)
else:
nxpix = int((1.5 * beam_radius_v + xspread) / cellx_v)
nypix = int((1.5 * beam_radius_v + yspread) / celly_v)
if (not self._mosaic) and (sfpblimit is not None):
beam_fwhp = 1.12 / 1.22 * beam_radius_v
nxpix = int(utils.round_half_up(1.1 * beam_fwhp * math.sqrt(-math.log(sfpblimit) / math.log(2.)) / cellx_v))
nypix = int(utils.round_half_up(1.1 * beam_fwhp * math.sqrt(-math.log(sfpblimit) / math.log(2.)) / celly_v))
if max_pixels is not None:
nxpix = min(nxpix, max_pixels)
nypix = min(nypix, max_pixels)
# set nxpix, nypix to next highest 'composite number'
suTool = casa_tools.synthesisutils
nxpix = suTool.getOptimumSize(nxpix)
nypix = suTool.getOptimumSize(nypix)
suTool.done()
return [nxpix, nypix]
[docs] def imagename(self, output_dir=None, intent=None, field=None, spwspec=None, specmode=None, band=None):
try:
nameroot = self.imagename_prefix
if nameroot == 'unknown':
nameroot = 'oussid'
# need to sanitize the nameroot here because when it's added
# to filenamer as an asdm, os.path.basename is run on it with
# undesirable results.
nameroot = filenamer.sanitize(nameroot)
except:
nameroot = 'oussid'
namer = filenamer.Image()
namer._associations.asdm(nameroot)
if output_dir:
namer.output_dir(output_dir)
namer.stage('STAGENUMBER')
if intent:
namer.intent(intent)
if field:
namer.source(field)
if spwspec:
# find all the spwids present in the list
p = re.compile(r"[ ,]+(\d+)")
spwids = p.findall(' %s' % spwspec)
spw = '_'.join(map(str, sorted(map(int, set(spwids)))))
namer.spectral_window(spw)
if specmode:
namer.specmode(specmode)
# filenamer returns a sanitized filename (i.e. one with
# illegal characters replace by '_'), no need to check
# the name components individually.
imagename = namer.get_filename()
return imagename
[docs] def width(self, spwid):
msname = self.vislist[0]
real_spwid = self.observing_run.virtual2real_spw_id(spwid, self.observing_run.get_ms(msname))
ms = self.observing_run.get_ms(name=msname)
spw = ms.get_spectral_window(real_spwid)
# negative widths appear to confuse clean,
if spw.channels[1].high > spw.channels[0].high:
width = spw.channels[1].high - spw.channels[0].high
else:
width = spw.channels[0].high - spw.channels[1].high
# increase width slightly to try to avoid the error:
# WARN SubMS::convertGridPars *** Requested new channel width is
# smaller than smallest original channel width.
# This should no longer be necessary (CASA >=4.7) and this width
# method does not seem to be used anywhere anyways.
# width = decimal.Decimal('1.0001') * width
width = str(width)
return width
[docs] def ncorr(self, spwid):
msname = self.vislist[0]
real_spwid = self.observing_run.virtual2real_spw_id(spwid, self.observing_run.get_ms(msname))
ms = self.observing_run.get_ms(name=msname)
spw = ms.get_spectral_window(real_spwid)
# Get the data description for this spw
dd = ms.get_data_description(spw=spw)
if dd is None:
LOG.debug('Missing data description for spw %s ' % spwid)
return 0
# Determine the number of correlations
# Check that they are between 1 and 4
ncorr = len(dd.corr_axis)
if ncorr not in {1, 2, 4}:
LOG.debug('Wrong number of correlations %s for spw %s ' % (ncorr, spwid))
return 0
return ncorr
[docs] def pblimits(self, pb):
pblimit_image = 0.2
pblimit_cleanmask = 0.3
if (pb not in [None, '']):
try:
iaTool = casa_tools.image
iaTool.open(pb)
nx, ny, np, nf = iaTool.shape()
# First check if the center edge is masked. If so, then the
# default pb level of 0.2 is fully within the image and no
# adjustment is needed.
if not iaTool.getchunk([nx // 2, 0, 0, nf // 2], [nx // 2, 0, 0, nf // 2], getmask=True).flatten()[0]:
return pblimit_image, pblimit_cleanmask
pb_edge = 0.0
i_pb_edge = -1
# There are cases with zero edged PBs (due to masking),
# check for first unmasked value.
# Should no longer encounter the mask here due to the
# above check, but keep code around for now.
while pb_edge == 0.0 and i_pb_edge < ny // 2:
i_pb_edge += 1
pb_edge = iaTool.getchunk([nx//2, i_pb_edge, 0, nf//2], [nx//2, i_pb_edge, 0, nf//2]).flatten()[0]
if (pb_edge > 0.2):
i_pb_image = max(i_pb_edge, int(0.05 * ny))
pblimit_image = iaTool.getchunk([nx//2, i_pb_image, 0, nf//2], [nx//2, i_pb_image, 0, nf//2]).flatten()[0]
i_pb_cleanmask = i_pb_image + int(0.05 * ny)
pblimit_cleanmask = iaTool.getchunk([nx//2, i_pb_cleanmask, 0, nf//2], [nx//2, i_pb_cleanmask, 0, nf//2]).flatten()[0]
except Exception as e:
LOG.warning('Could not analyze PB: %s. Using default pblimit values.' % (e))
finally:
iaTool.close()
return pblimit_image, pblimit_cleanmask
[docs] def deconvolver(self, specmode, spwspec):
if (specmode == 'cont'):
fr_bandwidth = self.get_fractional_bandwidth(spwspec)
if (fr_bandwidth > 0.1):
return 'mtmfs'
else:
return 'hogbom'
else:
return 'hogbom'
[docs] def get_min_max_freq(self, spwspec):
"""
Given a comma separated string list of spectral windows, determines the minimum and maximum
frequencies in spectral window list and the corresponding spectral window indexes.
:param spwspec: comma separated string list of spectral windows.
:return: dictionary min. and max. frequencies (in Hz) and corresponding spw indexes.
"""
abs_min_frequency = 1.0e15
abs_max_frequency = 0.0
min_freq_spwid = -1
max_freq_spwid = -1
msname = self.vislist[0]
ms = self.observing_run.get_ms(name=msname)
for spwid in spwspec.split(','):
real_spwid = self.observing_run.virtual2real_spw_id(spwid, self.observing_run.get_ms(msname))
spw = ms.get_spectral_window(real_spwid)
min_frequency = float(spw.min_frequency.to_units(measures.FrequencyUnits.HERTZ))
if (min_frequency < abs_min_frequency):
abs_min_frequency = min_frequency
min_freq_spwid = spwid
max_frequency = float(spw.max_frequency.to_units(measures.FrequencyUnits.HERTZ))
if (max_frequency > abs_max_frequency):
abs_max_frequency = max_frequency
max_freq_spwid = spwid
return {'abs_min_freq': abs_min_frequency, 'abs_max_freq': abs_max_frequency,
'min_freq_spwid': min_freq_spwid, 'max_freq_spwid': max_freq_spwid}
[docs] def get_fractional_bandwidth(self, spwspec):
"""Returns fractional bandwidth for selected spectral windows"""
freq_limits = self.get_min_max_freq(spwspec)
return 2.0 * (freq_limits['abs_max_freq'] - freq_limits['abs_min_freq']) / \
(freq_limits['abs_min_freq'] + freq_limits['abs_max_freq'])
[docs] def robust(self):
"""Default robust value."""
return 0.5
[docs] def center_field_ids(self, msnames, field, intent, phasecenter, exclude_intent=None):
"""Get per-MS IDs of field closest to the phase center."""
meTool = casa_tools.measures
qaTool = casa_tools.quanta
ref_field_ids = []
# Phase center coordinates
pc_direc = meTool.source(phasecenter)
for msname in msnames:
try:
ms_obj = self.observing_run.get_ms(msname)
if exclude_intent is None:
field_ids = [f.id for f in ms_obj.fields if (f.name == field) and (intent in f.intents)]
else:
field_ids = [f.id for f in ms_obj.fields if (f.name == field) and (intent in f.intents) and (exclude_intent not in f.intents)]
separations = [qaTool.getvalue(meTool.separation(pc_direc, f.mdirection)) for f in ms_obj.fields if f.id in field_ids]
ref_field_ids.append(field_ids[separations.index(min(separations))])
except:
ref_field_ids.append(-1)
return ref_field_ids
[docs] def calc_topo_ranges(self, inputs):
"""Calculate TOPO ranges for hif_tclean inputs."""
spw_topo_freq_param_lists = []
spw_topo_chan_param_lists = []
spw_topo_freq_param_dict = collections.defaultdict(dict)
spw_topo_chan_param_dict = collections.defaultdict(dict)
p = re.compile('([\d.]*)(~)([\d.]*)(\D*)')
total_topo_freq_ranges = []
topo_freq_ranges = []
num_channels = []
meTool = casa_tools.measures
qaTool = casa_tools.quanta
# Phase center coordinates
pc_direc = meTool.source(inputs.phasecenter)
# Get per-MS IDs of field closest to the phase center
ref_field_ids = self.center_field_ids(inputs.vis, inputs.field, inputs.intent, pc_direc)
# Get a cont file handler for the conversion to TOPO
contfile_handler = contfilehandler.ContFileHandler(self.contfile)
aggregate_lsrk_bw = '0.0GHz'
msname = self.vislist[0]
ms = self.observing_run.get_ms(name=msname)
for spwid in inputs.spw.split(','):
real_spwid = self.observing_run.virtual2real_spw_id(spwid, self.observing_run.get_ms(msname))
spw_info = ms.get_spectral_window(real_spwid)
num_channels.append(spw_info.num_channels)
min_frequency = float(spw_info.min_frequency.to_units(measures.FrequencyUnits.GIGAHERTZ))
max_frequency = float(spw_info.max_frequency.to_units(measures.FrequencyUnits.GIGAHERTZ))
# Save spw width
total_topo_freq_ranges.append((min_frequency, max_frequency))
if 'spw%s' % (spwid) in inputs.spwsel_lsrk:
if (inputs.spwsel_lsrk['spw%s' % (spwid)] not in ['ALL', '', 'NONE']):
freq_selection, refer = inputs.spwsel_lsrk['spw%s' % (spwid)].split()
if (refer == 'LSRK'):
# Convert to TOPO
topo_freq_selections, topo_chan_selections, aggregate_spw_lsrk_bw = contfile_handler.lsrk_to_topo(inputs.spwsel_lsrk['spw%s' % (spwid)], inputs.vis, ref_field_ids, spwid, self.observing_run)
spw_topo_freq_param_lists.append(['%s:%s' % (spwid, topo_freq_selection.split()[0]) for topo_freq_selection in topo_freq_selections])
spw_topo_chan_param_lists.append(['%s:%s' % (spwid, topo_chan_selection.split()[0]) for topo_chan_selection in topo_chan_selections])
for i in range(len(inputs.vis)):
spw_topo_freq_param_dict[os.path.basename(inputs.vis[i])][spwid] = topo_freq_selections[i].split()[0]
spw_topo_chan_param_dict[os.path.basename(inputs.vis[i])][spwid] = topo_chan_selections[i].split()[0]
# Count only one selection !
for topo_freq_range in topo_freq_selections[0].split(';'):
f1, sep, f2, unit = p.findall(topo_freq_range)[0]
topo_freq_ranges.append((float(f1), float(f2)))
else:
LOG.warning('Cannot convert frequency selection properly to TOPO. Using plain ranges for all MSs.')
spw_topo_freq_param_lists.append(['%s:%s' % (spwid, freq_selection)] * len(inputs.vis))
# TODO: Need to derive real channel ranges
spw_topo_chan_param_lists.append(['%s:0~%s' % (spwid, spw_info.num_channels - 1)] * len(inputs.vis))
for i in range(len(inputs.vis)):
spw_topo_freq_param_dict[os.path.basename(inputs.vis[i])][spwid] = freq_selection.split()[0]
# TODO: Need to derive real channel ranges
spw_topo_chan_param_dict[os.path.basename(inputs.vis[i])][spwid] = '0~%d' % (spw_info.num_channels - 1)
# Count only one selection !
aggregate_spw_lsrk_bw = '0.0GHz'
for freq_range in freq_selection.split(';'):
f1, sep, f2, unit = p.findall(freq_range)[0]
topo_freq_ranges.append((float(f1), float(f2)))
delta_f = qaTool.sub('%s%s' % (f2, unit), '%s%s' % (f1, unit))
aggregate_spw_lsrk_bw = qaTool.add(aggregate_spw_lsrk_bw, delta_f)
else:
spw_topo_freq_param_lists.append([spwid] * len(inputs.vis))
spw_topo_chan_param_lists.append([spwid] * len(inputs.vis))
for msname in inputs.vis:
spw_topo_freq_param_dict[os.path.basename(msname)][spwid] = ''
spw_topo_chan_param_dict[os.path.basename(msname)][spwid] = ''
topo_freq_ranges.append((min_frequency, max_frequency))
aggregate_spw_lsrk_bw = '%.10fGHz' % (max_frequency - min_frequency)
if (inputs.spwsel_lsrk['spw%s' % (spwid)] != 'ALL') and (inputs.intent == 'TARGET') and (inputs.specmode in ('mfs', 'cont') and self.warn_missing_cont_ranges()):
LOG.warning('No continuum frequency selection for Target Field %s SPW %s' % (inputs.field, spwid))
else:
spw_topo_freq_param_lists.append([spwid] * len(inputs.vis))
spw_topo_chan_param_lists.append([spwid] * len(inputs.vis))
for msname in inputs.vis:
spw_topo_freq_param_dict[os.path.basename(msname)][spwid] = ''
spw_topo_chan_param_dict[os.path.basename(msname)][spwid] = ''
topo_freq_ranges.append((min_frequency, max_frequency))
aggregate_spw_lsrk_bw = '%.10fGHz' % (max_frequency - min_frequency)
if (inputs.intent == 'TARGET') and (inputs.specmode in ('mfs', 'cont') and self.warn_missing_cont_ranges()):
LOG.warning('No continuum frequency selection for Target Field %s SPW %s' % (inputs.field, spwid))
aggregate_lsrk_bw = qaTool.add(aggregate_lsrk_bw, aggregate_spw_lsrk_bw)
spw_topo_freq_param = [','.join(spwsel_per_ms) for spwsel_per_ms in [[spw_topo_freq_param_list_per_ms[i] for spw_topo_freq_param_list_per_ms in spw_topo_freq_param_lists] for i in range(len(inputs.vis))]]
spw_topo_chan_param = [','.join(spwsel_per_ms) for spwsel_per_ms in [[spw_topo_chan_param_list_per_ms[i] for spw_topo_chan_param_list_per_ms in spw_topo_chan_param_lists] for i in range(len(inputs.vis))]]
# Calculate total bandwidth
total_topo_bw = '0.0GHz'
for total_topo_freq_range in utils.merge_ranges(total_topo_freq_ranges):
total_topo_bw = qaTool.add(total_topo_bw, qaTool.sub('%.10fGHz' % (float(total_topo_freq_range[1])), '%.10fGHz' % (float(total_topo_freq_range[0]))))
# Calculate aggregate selected bandwidth
aggregate_topo_bw = '0.0GHz'
for topo_freq_range in utils.merge_ranges(topo_freq_ranges):
aggregate_topo_bw = qaTool.add(aggregate_topo_bw, qaTool.sub('%.10fGHz' % (float(topo_freq_range[1])), '%.10fGHz' % (float(topo_freq_range[0]))))
return spw_topo_freq_param, spw_topo_chan_param, spw_topo_freq_param_dict, spw_topo_chan_param_dict, total_topo_bw, aggregate_topo_bw, aggregate_lsrk_bw
[docs] def get_channel_flags(self, msname, field, spw):
"""
Determine channel flags for a given field and spw selection in one MS.
"""
# CAS-8997
#
# ms.selectinit(datadescid=x) is required to avoid the 'Data shape
# varies...' warning message. There's no guarantee that a single
# spectral window will resolve to a single data description, e.g.
# the polarisation setup may change. There's not enough
# information passed into this function to consistently identify the
# data description that should be specified so on occasion the
# warning message will reoccur.
#
# TODO refactor method signature to include data description ID
#
msDO = self.observing_run.get_ms(name=msname)
real_spwid = self.observing_run.virtual2real_spw_id(spw, self.observing_run.get_ms(msname))
spwDO = msDO.get_spectral_window(real_spwid)
data_description_ids = {dd.id for dd in msDO.data_descriptions if dd.spw is spwDO}
if len(data_description_ids) == 1:
dd_id = data_description_ids.pop()
else:
msg = 'Could not resolve {!s} spw {!s} to a single data description'.format(msname, spwDO.id)
LOG.warning(msg)
dd_id = 0
try:
with casa_tools.MSReader(msname) as msTool:
# CAS-8997 selectinit is required to avoid the 'Data shape varies...' warning message
msTool.selectinit(datadescid=dd_id)
# Antenna selection does not work (CAS-8757)
# staql={'field': field, 'spw': real_spwid, 'antenna': '*&*'}
staql = {'field': field, 'spw': str(real_spwid)}
msTool.msselect(staql)
msTool.iterinit(maxrows=500)
msTool.iterorigin()
channel_flags = np.array([True] * spwDO.num_channels)
iterating = True
while iterating:
flag_ants = msTool.getdata(['flag', 'antenna1', 'antenna2'])
# Calculate averaged flagging vector keeping all unflagged
# channels from any baseline.
if flag_ants != {}:
for i in range(flag_ants['flag'].shape[2]):
# Antenna selection does not work (CAS-8757)
if flag_ants['antenna1'][i] != flag_ants['antenna2'][i]:
for j in range(flag_ants['flag'].shape[0]):
channel_flags = np.logical_and(channel_flags, flag_ants['flag'][j, :, i])
iterating = msTool.iternext()
except Exception as e:
LOG.error('Exception while determining flags: %s' % e)
channel_flags = np.array([])
return channel_flags
[docs] def get_first_unflagged_channel_from_center(self, channel_flags):
"""
Find first unflagged channel from center channel.
"""
i_low = i_high = None
num_channels = len(channel_flags)
center_channel = int(num_channels // 2)
for i in range(center_channel, -1, -1):
if channel_flags[i] == False:
i_low = i
break
for i in range(center_channel, num_channels):
if channel_flags[i] == False:
i_high = i
break
if (i_low is None) and (i_high is None):
return None
elif (i_low is None):
return i_high
elif (i_high is None):
return i_low
else:
if (center_channel - i_low) < (i_high - center_channel):
return i_low
else:
return i_high
[docs] def freq_intersection(self, vis, field, intent, spw, frame='LSRK'):
"""
Calculate LSRK frequency intersection of a list of MSs for a
given field and spw. Exclude flagged channels.
"""
per_eb_flagged_freq_ranges = []
per_eb_full_freq_ranges = []
channel_widths = []
for msname in vis:
ms = self.observing_run.get_ms(name=msname)
real_spw = str(self.observing_run.virtual2real_spw_id(spw, ms))
# Loop over mosaic fields and determine the channel flags per field.
# Skip channels that have only partial pointing coverage.
field_ids = self.field(intent=intent, field=field, vislist=[msname])[0].split(',')
per_field_flagged_freq_ranges = []
per_field_full_freq_ranges = []
for field_id in field_ids:
# For multi-tuning EBs, a field may be observed in just a subset of
# science spws. Filter out spws not applicable to this field.
spw_do = ms.get_spectral_window(real_spw)
field_dos = ms.get_fields(field, intent=intent)
# field(s) not in MS
if not field_dos:
continue
# spw not observed for the field(s)
if not [f for f in field_dos if spw_do in f.valid_spws]:
continue
# Get the channel flags (uses virtual spw ID !)
channel_flags = self.get_channel_flags(msname, field_id, spw)
# Get indices of unflagged channels
nfi = np.where(channel_flags == False)[0]
# Use unflagged edge channels to determine LSRK frequency range
if nfi.shape != (0,):
# Use the edges. Another heuristic will skip one extra channel later in the final frequency range.
with casa_tools.SelectvisReader(msname, field=field_id,
spw='%s:%d~%d' % (real_spw, nfi[0], nfi[-1])) as imager:
result = imager.advisechansel(getfreqrange=True, freqframe=frame)
f0_flagged = result['freqstart']
f1_flagged = result['freqend']
per_field_flagged_freq_ranges.append((f0_flagged, f1_flagged))
# The frequency range from advisechansel is from channel edge
# to channel edge. To get the width, one needs to divide by the
# number of channels in the selection.
channel_widths.append((f1_flagged - f0_flagged) / (nfi[-1] - nfi[0] + 1))
# Also get the full ranges to trim the final LSRK range for
# odd tunings near the LO range edges (PIPE-526).
with casa_tools.SelectvisReader(msname, field=field_id, spw='%s' % (real_spw)) as imager:
result = imager.advisechansel(getfreqrange=True, freqframe=frame)
f0_full = result['freqstart']
f1_full = result['freqend']
per_field_full_freq_ranges.append((f0_full, f1_full))
# Calculate weighted intersection with threshold of 1.0 to avoid
# edge channels that have drifted too much in LSRK or REST during
# the EB.
if per_field_flagged_freq_ranges != []:
per_eb_flagged_intersect_range = utils.intersect_ranges_by_weight(per_field_flagged_freq_ranges, max(channel_widths), 1.0)
per_eb_full_intersect_range = utils.intersect_ranges_by_weight(per_field_full_freq_ranges, max(channel_widths), 1.0)
if per_eb_flagged_intersect_range != () and per_eb_full_intersect_range != ():
per_eb_flagged_freq_ranges.append(per_eb_flagged_intersect_range)
per_eb_full_freq_ranges.append(per_eb_full_intersect_range)
if per_eb_flagged_freq_ranges == []:
return -1, -1, 0
# Calculate weighted intersection with threshold of 0.6 to avoid
# partially flagged spws of individual EBs restricting the frequency
# axis (PIPE-180).
intersect_range_flagged = utils.intersect_ranges_by_weight(per_eb_flagged_freq_ranges, max(channel_widths), 0.6)
# Calculate the weighted intersection of all ranges with threshold
# of 1.0 to get the maximum possible range with contributions from
# all EBs (PIPE-526).
intersect_range_full = utils.intersect_ranges_by_weight(per_eb_full_freq_ranges, max(channel_widths), 1.0)
if intersect_range_flagged != () and intersect_range_full != ():
intersect_range_final = utils.intersect_ranges_by_weight([intersect_range_flagged, intersect_range_full], max(channel_widths), 1.0)
if0, if1 = intersect_range_final
return if0, if1, max(channel_widths)
else:
return -1, -1, 0
[docs] def warn_missing_cont_ranges(self):
return False
[docs] def get_bw_corr_factor(self, ms_do, spw, nchan):
"""
Calculate effective bandwidth correction factor.
"""
real_spwid = self.observing_run.virtual2real_spw_id(spw, ms_do)
spw_do = ms_do.get_spectral_window(real_spwid)
spwchan = spw_do.num_channels
physicalBW_of_1chan = float(spw_do.channels[0].getWidth().convert_to(measures.FrequencyUnits.HERTZ).value)
effectiveBW_of_1chan = float(spw_do.channels[0].effective_bw.convert_to(measures.FrequencyUnits.HERTZ).value)
BW_ratio = effectiveBW_of_1chan / physicalBW_of_1chan
if (BW_ratio <= 1.0):
N_smooth = 0
elif (utils.equal_to_n_digits(BW_ratio, 2.667, 4)):
N_smooth = 1
elif (utils.equal_to_n_digits(BW_ratio, 1.600, 4)):
N_smooth = 2
elif (utils.equal_to_n_digits(BW_ratio, 1.231, 4)):
N_smooth = 4
elif (utils.equal_to_n_digits(BW_ratio, 1.104, 4)):
N_smooth = 8
elif (utils.equal_to_n_digits(BW_ratio, 1.049, 4)):
N_smooth = 16
else:
LOG.warning('Could not evaluate channel bandwidths ratio. Physical: %s Effective: %s Ratio: %s' % (physicalBW_of_1chan, effectiveBW_of_1chan, BW_ratio))
N_smooth = 0
if N_smooth > 0 and nchan > 1:
optimisticBW = nchan * float(effectiveBW_of_1chan)
approximateEffectiveBW = (nchan + 1.12 * (spwchan - nchan) / spwchan / N_smooth) * float(physicalBW_of_1chan)
SCF = (optimisticBW / approximateEffectiveBW) ** 0.5
else:
SCF = 1.0
return SCF, physicalBW_of_1chan, effectiveBW_of_1chan
[docs] def calc_sensitivities(self, vis, field, intent, spw, nbin, spw_topo_chan_param_dict, specmode, gridder, cell, imsize, weighting, robust, uvtaper, center_only=False, known_sensitivities={}, force_calc=False):
"""Compute sensitivity estimate using CASA."""
cqa = casa_tools.quanta
# Need to work on a local copy of known_sensitivities to avoid setting the
# method default value inadvertently
local_known_sensitivities = copy.deepcopy(known_sensitivities)
sensitivities = []
eff_ch_bw = 0.0
sens_bws = {}
field_ids = self.field(intent, field, vislist=vis) # list of strings with comma separated IDs per MS
phasecenter = self.phasecenter(field_ids, vislist=vis) # string
center_field_ids = self.center_field_ids(vis, field, intent, phasecenter) # list of integer IDs per MS
for ms_index, msname in enumerate(vis):
ms = self.observing_run.get_ms(name=msname)
for intSpw in map(int, spw.split(',')):
try:
real_spwid = self.observing_run.virtual2real_spw_id(intSpw, self.observing_run.get_ms(msname))
spw_do = ms.get_spectral_window(real_spwid)
# For multi-tuning EBs, a field may be observed in just a subset of
# science spws. Filter out spws not applicable to this field.
field_dos = ms.get_fields(field, intent=intent)
# field(s) not in MS
if not field_dos:
continue
# spw not observed for the field(s)
if not [f for f in field_dos if spw_do in f.valid_spws]:
continue
sens_bws[intSpw] = 0.0
if (specmode == 'cube'):
# Use the center channel selection
if nbin != -1:
chansel = '%d~%d' % (int((spw_do.num_channels - nbin) / 2.0), int((spw_do.num_channels + nbin) / 2.0) - 1)
nchan_sel = nbin
else:
chansel = '%d~%d' % (int(spw_do.num_channels / 2.0), int(spw_do.num_channels / 2.0))
nchan_sel = 1
else:
if spw_topo_chan_param_dict.get(os.path.basename(msname), False):
if spw_topo_chan_param_dict[os.path.basename(msname)].get(str(intSpw), '') != '':
# Use continuum frequency selection
chansel = spw_topo_chan_param_dict[os.path.basename(msname)][str(intSpw)]
nchan_sel = np.sum([c[1]-c[0]+1 for c in [list(map(int, sel.split('~')))
for sel in chansel.split(';')]])
else:
# Use full spw (unflagged channels)
channel_flags = self.get_channel_flags(msname, field, intSpw)
nchan_unflagged = np.where(channel_flags == False)[0].shape[0]
# Dummy selection
chansel = '0~%d' % (nchan_unflagged - 1)
nchan_sel = nchan_unflagged
else:
# Use full spw (unflagged channels)
channel_flags = self.get_channel_flags(msname, field, intSpw)
nchan_unflagged = np.where(channel_flags == False)[0].shape[0]
# Dummy selection
chansel = '0~%d' % (nchan_unflagged - 1)
nchan_sel = nchan_unflagged
chansel_full = '0~%d' % (spw_do.num_channels - 1)
# If necessary, calculate center field full spw sensitivity
try:
calc_sens = False
if force_calc:
# Reset flag to avoid clearing the dictionary several times
force_calc = False
local_known_sensitivities = {}
LOG.info('Got manual request to recalculate sensitivities.')
raise Exception('Got manual request to recalculate sensitivities.')
if local_known_sensitivities['robust'] != robust:
LOG.info('robust value changed (old: %.1f, new: %.1f). Re-calculating sensitivities.' % (local_known_sensitivities['robust'], robust))
local_known_sensitivities = {}
raise Exception('robust value changed (old: %.1f, new: %.1f). Re-calculating sensitivities.' % (local_known_sensitivities['robust'], robust))
if local_known_sensitivities['uvtaper'] != uvtaper:
LOG.info('uvtaper value changed (old: %s, new: %s). Re-calculating sensitivities.' % (str(local_known_sensitivities['uvtaper']), str(uvtaper)))
local_known_sensitivities = {}
raise Exception('uvtaper value changed (old: %s, new: %s). Re-calculating sensitivities.' % (str(local_known_sensitivities['uvtaper']), str(uvtaper)))
center_field_full_spw_sensitivity = cqa.getvalue(cqa.convert(local_known_sensitivities[os.path.basename(msname)][field][intent][intSpw]['sensitivityAllChans'], 'Jy/beam'))[0]
nchan_unflagged = local_known_sensitivities[os.path.basename(msname)][field][intent][intSpw]['nchanUnflagged']
eff_ch_bw = cqa.getvalue(cqa.convert(local_known_sensitivities[os.path.basename(msname)][field][intent][intSpw]['effChanBW'], 'Hz'))[0]
sens_bws[intSpw] = cqa.getvalue(cqa.convert(local_known_sensitivities[os.path.basename(msname)][field][intent][intSpw]['sensBW'], 'Hz'))[0]
LOG.info('Using previously calculated full SPW apparentsens value of %.3g Jy/beam'
' for EB %s Field %s Intent %s SPW %s' % (center_field_full_spw_sensitivity,
os.path.basename(msname).replace('.ms', ''),
field, intent, str(intSpw)))
except Exception as e:
calc_sens = True
center_field_full_spw_sensitivity, eff_ch_bw, sens_bws[intSpw] = self.get_sensitivity(ms, center_field_ids[ms_index], intent, intSpw, chansel_full, specmode, cell, imsize, weighting, robust, uvtaper)
channel_flags = self.get_channel_flags(msname, field, intSpw)
nchan_unflagged = np.where(channel_flags == False)[0].shape[0]
local_known_sensitivities['recalc'] = True
local_known_sensitivities['robust'] = robust
local_known_sensitivities['uvtaper'] = uvtaper
utils.set_nested_dict(local_known_sensitivities, (os.path.basename(msname), field, intent, intSpw, 'sensitivityAllChans'), '%.3g Jy/beam' % (center_field_full_spw_sensitivity))
utils.set_nested_dict(local_known_sensitivities, (os.path.basename(msname), field, intent, intSpw, 'nchanUnflagged'), nchan_unflagged)
utils.set_nested_dict(local_known_sensitivities, (os.path.basename(msname), field, intent, intSpw, 'effChanBW'), '%s Hz' % (eff_ch_bw))
utils.set_nested_dict(local_known_sensitivities, (os.path.basename(msname), field, intent, intSpw, 'sensBW'), '%s Hz' % (sens_bws[intSpw]))
# Correct from full spw to channel selection
if nchan_sel != nchan_unflagged:
chansel_corrected_center_field_sensitivity = center_field_full_spw_sensitivity * (float(nchan_unflagged) / float(nchan_sel)) ** 0.5
sens_bws[intSpw] = sens_bws[intSpw] * float(nchan_sel) / float(spw_do.num_channels)
LOG.info('Channel selection bandwidth heuristic (nbin or findcont; (spw BW / nchan_sel BW) ** 0.5):'
' Correcting sensitivity for EB %s Field %s SPW %s by %.3g from %.3g Jy/beam to'
' %.3g Jy/beam' % (os.path.basename(msname).replace('.ms', ''), field, str(intSpw),
(float(nchan_unflagged) / float(nchan_sel)) ** 0.5,
center_field_full_spw_sensitivity,
chansel_corrected_center_field_sensitivity))
else:
chansel_corrected_center_field_sensitivity = center_field_full_spw_sensitivity
# Correct for effective bandwidth effects
bw_corr_factor, physicalBW_of_1chan, effectiveBW_of_1chan = self.get_bw_corr_factor(ms, intSpw, nchan_sel)
center_field_sensitivity = chansel_corrected_center_field_sensitivity * bw_corr_factor
if bw_corr_factor != 1.0:
LOG.info('Effective BW heuristic: Correcting sensitivity for EB %s Field %s SPW %s by %.3g from %.3g Jy/beam to %.3g Jy/beam' % (os.path.basename(msname).replace('.ms', ''), field, str(intSpw), bw_corr_factor, chansel_corrected_center_field_sensitivity, center_field_sensitivity))
if gridder == 'mosaic':
# Correct for mosaic overlap factor
source_name = [f.source.name for f in ms.fields if (utils.dequote(f.name) == utils.dequote(field) and intent in f.intents)][0]
diameter = np.median([a.diameter for a in ms.antennas])
overlap_factor = mosaicoverlap.mosaicOverlapFactorMS(ms, source_name, intSpw, diameter)
LOG.info('Dividing by mosaic overlap improvement factor of %s corrects sensitivity for EB %s'
' Field %s SPW %s from %.3g Jy/beam to %.3g Jy/beam.'
'' % (overlap_factor, os.path.basename(msname).replace('.ms', ''), field, str(intSpw),
center_field_sensitivity, center_field_sensitivity / overlap_factor))
center_field_sensitivity = center_field_sensitivity / overlap_factor
if calc_sens and not center_only:
# Calculate diagnostic sensitivities for first and last field
first_field_id = min(int(i) for i in field_ids[ms_index].split(','))
first_field_full_spw_sensitivity, first_field_eff_ch_bw, first_field_sens_bw = self.get_sensitivity(ms, first_field_id, intent, intSpw, chansel_full, specmode, cell, imsize, weighting, robust, uvtaper)
first_field_sensitivity = first_field_full_spw_sensitivity * (float(nchan_unflagged) / float(nchan_sel)) ** 0.5 * bw_corr_factor / overlap_factor
last_field_id = max(int(i) for i in field_ids[ms_index].split(','))
last_field_full_spw_sensitivity, last_field_eff_ch_bw, last_field_sens_bw = self.get_sensitivity(ms, last_field_id, intent, intSpw, chansel_full, specmode, cell, imsize, weighting, robust, uvtaper)
last_field_sensitivity = last_field_full_spw_sensitivity * (float(nchan_unflagged) / float(nchan_sel)) ** 0.5 * bw_corr_factor / overlap_factor
LOG.info('Corrected sensitivities for EB %s, Field %s, SPW %s for the first, central, and'
' last pointings are: %.3g / %.3g / %.3g Jy/beam'
'' % (os.path.basename(msname).replace('.ms', ''), field, str(real_spwid),
first_field_sensitivity, center_field_sensitivity, last_field_sensitivity))
sensitivities.append(center_field_sensitivity)
except Exception as e:
# Simply pass as this could be a case of a source not
# being present in the MS.
pass
if (len(sensitivities) > 0):
sensitivity = 1.0 / np.sqrt(np.sum(1.0 / np.array(sensitivities) ** 2))
# Calculate total bandwidth for this selection
sens_bw = sum(sens_bws.values())
if specmode == 'cont' and spw_topo_chan_param_dict == {}:
# Correct for spw frequency overlaps
agg_bw = cqa.getvalue(cqa.convert(self.aggregate_bandwidth(list(map(int, spw.split(',')))), 'Hz'))
sensitivity = sensitivity * (sens_bw / agg_bw) ** 0.5
sens_bw = agg_bw
LOG.info('Final sensitivity estimate for Field %s, SPW %s specmode %s: %.3g Jy/beam', field, str(spw), specmode, sensitivity)
else:
defaultSensitivity = None
LOG.warning('Exception in calculating sensitivity.')
sensitivity = defaultSensitivity
sens_bw = None
return sensitivity, eff_ch_bw, sens_bw, copy.deepcopy(local_known_sensitivities)
[docs] def get_sensitivity(self, ms_do, field, intent, spw, chansel, specmode, cell, imsize, weighting, robust, uvtaper):
"""
Get sensitivity for a field / spw / chansel combination from CASA's
apparentsens method and a correction for effective channel widths
in case of online smoothing.
This heuristic is currently optimized for ALMA data only.
"""
real_spwid = self.observing_run.virtual2real_spw_id(spw, ms_do)
chansel_sensitivities = []
sens_bw = 0.0
for chanrange in chansel.split(';'):
scanids = [str(scan.id) for scan in ms_do.scans
if intent in scan.intents
and field in [fld.name for fld in scan.fields]]
scanids = ','.join(scanids)
antenna_ids = self.antenna_ids(intent, [os.path.basename(ms_do.name)])
taql = '||'.join(['ANTENNA1==%d' % i for i in antenna_ids[os.path.basename(ms_do.name)]])
try:
with casa_tools.SelectvisReader(ms_do.name, spw='%s:%s' % (real_spwid, chanrange), field=field,
scan=scanids, taql=taql) as imTool:
imTool.defineimage(mode=specmode if specmode == 'cube' else 'mfs', spw=real_spwid, cellx=cell[0],
celly=cell[0], nx=imsize[0], ny=imsize[1])
imTool.weight(type=weighting, rmode='norm', robust=robust)
if uvtaper not in (None, []):
if len(uvtaper) == 1:
bmaj = bmin = uvtaper[0]
bpa = '0.0deg'
elif len(uvtaper) == 3:
bmaj, bmin, bpa = uvtaper
else:
raise Exception('Unknown uvtaper format: %s' % (str(uvtaper)))
imTool.filter(type='gaussian', bmaj=bmaj, bmin=bmin, bpa=bpa)
result = imTool.apparentsens()
if result[1] == 0.0:
raise Exception('Empty selection')
apparentsens_value = result[1]
LOG.info('apparentsens result for EB %s Field %s SPW %s ChanRange %s: %s Jy/beam'
'' % (os.path.basename(ms_do.name).replace('.ms', ''), field, real_spwid, chanrange,
apparentsens_value))
cstart, cstop = list(map(int, chanrange.split('~')))
nchan = cstop - cstart + 1
SCF, physicalBW_of_1chan, effectiveBW_of_1chan = self.get_bw_corr_factor(ms_do, spw, nchan)
sens_bw += nchan * physicalBW_of_1chan
chansel_sensitivities.append(apparentsens_value)
except Exception as e:
if (str(e) != 'Empty selection'):
LOG.info('Could not calculate sensitivity for EB %s Field %s SPW %s ChanRange %s: %s'
'' % (os.path.basename(ms_do.name).replace('.ms', ''), field, real_spwid, chanrange, e))
if (len(chansel_sensitivities) > 0):
return 1.0 / np.sqrt(np.sum(1.0 / np.array(chansel_sensitivities) ** 2)), effectiveBW_of_1chan, sens_bw
else:
return 0.0, effectiveBW_of_1chan, sens_bw
[docs] def dr_correction(self, threshold, dirty_dynamic_range, residual_max, intent, tlimit):
"""Adjustment of cleaning threshold due to dynamic range limitations."""
DR_correction_factor = 1.0
maxEDR_used = False
return threshold, DR_correction_factor, maxEDR_used
[docs] def niter_correction(self, niter, cell, imsize, residual_max, threshold, residual_robust_rms, mask_frac_rad=0.0):
"""Adjustment of number of cleaning iterations due to mask size.
Circular mask is assumed with a radius equal to mask_frac_rad times the longest image dimension
in arcsec. If mask_frac_rad=0.0, then no new cleaning iteration step number is estimate and the
input niter value is returned.
Note that the method assumes synthesized_beam = 5 * cell. This might lead to underestimated new
niter value if the beam is sampled by less than 5 pixels (e.g. when hif_checkproductsize() task
was called earlier.
:param niter: User or heuristics set number of cleaning iterations
:param cell: Image cell size in arcsec
:param imsize: Two element list or array of image pixel count along x and y axis
:param residual_max: Dirty image residual maximum [Jy].
:param threshold: Cleaning threshold value [Jy].
:param residual_robust_rms: Residual scaled MAD [Jy] (not used in base method).
:param mask_frac_rad: Mask radius is given by mask_frac_rad * max(imsize) * cell [arcsec]
:return: Modified niter value based on mask and beam size heuristic.
"""
# Default case: do not estimate new niter
if mask_frac_rad == 0.0:
return niter
# Estimate new niter based on circular mask
qaTool = casa_tools.quanta
threshold_value = qaTool.convert(threshold, 'Jy')['value']
# Compute automatic niter estimate
old_niter = niter
kappa = 5
loop_gain = 0.1
# TODO: Replace with actual pixel counting rather than assumption about geometry
r_mask = mask_frac_rad * max(imsize[0], imsize[1]) * qaTool.convert(cell[0], 'arcsec')['value']
# Assume that the beam is 5 pixels wide, this might be incorrect in some cases (see docstring).
# TODO: Pass synthesized beam size explicitly rather than assuming a
# certain ratio of beam to cell size (which can be different
# if the product size is being mitigated or if a different
# imaging_mode uses different heuristics).
beam = qaTool.convert(cell[0], 'arcsec')['value'] * 5.0
new_niter_f = int(kappa / loop_gain * (r_mask / beam) ** 2 * residual_max / threshold_value)
new_niter = int(utils.round_half_up(new_niter_f, -int(np.log10(new_niter_f))))
# Prevent int overflow in tclean CASA task (see CAS-11847)
new_niter = min(new_niter, 2 ** 31 - 1)
if new_niter != old_niter:
LOG.info('niter heuristic: Modified niter from %d to %d based on mask vs. beam size heuristic'
'' % (old_niter, new_niter))
return new_niter
[docs] def niter(self):
return None
[docs] def get_autobox_params(self, iteration, intent, specmode, robust):
"""Default auto-boxing parameters."""
sidelobethreshold = None
noisethreshold = None
lownoisethreshold = None
negativethreshold = None
minbeamfrac = None
growiterations = None
dogrowprune = None
minpercentchange = None
fastnoise = None
return (sidelobethreshold, noisethreshold, lownoisethreshold, negativethreshold, minbeamfrac, growiterations,
dogrowprune, minpercentchange, fastnoise)
[docs] def nterms(self, spwspec):
return None
[docs] def cyclefactor(self, iteration):
return None
[docs] def cycleniter(self, iteration):
return None
[docs] def scales(self):
return None
[docs] def uvtaper(self, beam_natural=None, protect_long=None):
return None
[docs] def uvrange(self, field=None, spwspec=None):
return None, None
[docs] def reffreq(self):
return None
[docs] def restfreq(self):
return None
[docs] def conjbeams(self):
return None
[docs] def pb_correction(self):
return True
[docs] def antenna_diameters(self, vislist=None):
"""Count the antennas of given diameters per MS."""
if vislist is None:
local_vislist = self.vislist
else:
local_vislist = vislist
antenna_diameters = {}
for vis in local_vislist:
for antenna in self.observing_run.get_ms(vis).antennas:
if antenna.diameter not in antenna_diameters:
antenna_diameters[antenna.diameter] = 0
antenna_diameters[antenna.diameter] += 1
return antenna_diameters
[docs] def majority_antenna_ids(self, vislist=None):
"""Get the IDs of the majority (by diameter) antennas per MS."""
if vislist is None:
local_vislist = self.vislist
else:
local_vislist = vislist
# Determine majority diameter
antenna_diameters = self.antenna_diameters(local_vislist)
majority_diameter = sorted(antenna_diameters.items(), key=operator.itemgetter(1))[-1][0]
majority_antenna_ids = {}
for vis in local_vislist:
majority_antenna_ids[os.path.basename(vis)] = [antenna.id
for antenna in self.observing_run.get_ms(vis).antennas
if antenna.diameter == majority_diameter]
return majority_antenna_ids
[docs] def antenna_ids(self, intent, vislist=None):
"""Get the antenna IDs to be used for imaging."""
if vislist is None:
local_vislist = self.vislist
else:
local_vislist = vislist
if intent != 'TARGET':
# For calibrators use all antennas
antenna_ids = {}
for vis in local_vislist:
antenna_ids[os.path.basename(vis)] = [antenna.id for antenna in self.observing_run.get_ms(vis).antennas]
return antenna_ids
else:
# For science targets use majority antennas only
return self.majority_antenna_ids(local_vislist)
[docs] def check_psf(self, psf_name, field, spw):
"""Check for bad psf fits."""
cqa = casa_tools.quanta
bad_psf_fit = False
with casa_tools.ImageReader(psf_name) as image:
try:
beams = image.restoringbeam()['beams']
bmaj = np.array([cqa.getvalue(cqa.convert(b['*0']['major'], 'arcsec')) for b in beams.values()])
# Filter empty psf planes
bmaj = bmaj[np.where(bmaj > 1e-6)]
bmaj_median = np.median(bmaj)
cond1 = np.logical_and(0.0 < bmaj, bmaj < 0.5 * bmaj_median)
cond2 = bmaj > 2.0 * bmaj_median
if np.logical_or(cond1, cond2).any():
bad_psf_fit = True
LOG.warn('The PSF fit for one or more channels for field %s SPW %s failed, please check the'
' results for this cube carefully, there are likely data issues.' % (field, spw))
except:
pass
return bad_psf_fit
[docs] def usepointing(self):
"""tclean flag to use pointing table."""
# Currently ALMA and VLA do not want to use the table (CAS-11840).
return False
[docs] def mosweight(self, intent, field):
"""tclean flag to use mosaic weighting."""
# Currently only ALMA has decided to use this flag (CAS-11840). So
# the default is set to False here.
return False
[docs] def keep_iterating(self, iteration, hm_masking, tclean_stopcode, dirty_dynamic_range, residual_max, residual_robust_rms, field, intent, spw, specmode):
"""Determine if another tclean iteration is necessary."""
if iteration == 0:
return True, hm_masking
else:
return False, hm_masking
[docs] def threshold(self, iteration, threshold, hm_masking):
if iteration == 0:
return '0.0mJy'
else:
return threshold
[docs] def nsigma(self, iteration, hm_nsigma):
return hm_nsigma
[docs] def savemodel(self, iteration):
return None
[docs] def stokes(self):
return 'I'
[docs] def mask(self):
return ''
[docs] def specmode(self):
return 'mfs'
[docs] def datacolumn(self):
return None
[docs] def wprojplanes(self):
return None
[docs] def rotatepastep(self):
return None
[docs] def find_good_commonbeam(self, psf_filename):
"""
Find and replace outlier beams to calculate a good common beam.
Method from Urvashi Rao.
Returns new common beam and array of channel numbers with invalid beams.
Leaves old beams in the PSF as is.
"""
cqa = casa_tools.quanta
with casa_tools.ImageReader(psf_filename) as image:
allbeams = image.restoringbeam()
commonbeam = image.commonbeam()
nchan = allbeams['nChannels']
areas = np.zeros(nchan, 'float')
axratio = np.zeros(nchan, 'float')
weight = np.zeros(nchan, 'bool')
for ii in range(0, nchan):
axmajor = cqa.convert(cqa.quantity(allbeams['beams']['*'+str(ii)]['*0']['major']), 'arcsec')['value']
axminor = cqa.convert(cqa.quantity(allbeams['beams']['*'+str(ii)]['*0']['minor']), 'arcsec')['value']
areas[ii] = axmajor * axminor
axratio[ii] = axmajor/axminor
weight[ii] = np.isfinite( axratio[ii] )
## Detect outliers based on axis ratio
## The iterative loop is to get robust autoflagging
## Add a linear fit instead of just a 'mean' to account for slopes (useful for flagging on beam_area)
local_axratio = axratio.copy()
for steps in range(0, 2): ### Heuristic : how many iterations here ?
local_axratio[ weight==False ] = np.nan
mean_axrat = np.nanmean(local_axratio)
std_axrat = np.nanstd(local_axratio)
## Flag all points deviating from the mean by 3 times stdev
weight[ np.fabs(local_axratio-mean_axrat) > 3 * std_axrat ] = False
## Find the first channel with a valid beam
validbeamchans = np.where(weight)
chanid=0
if len(validbeamchans)>0:
chanid=validbeamchans[0][0]
else:
LOG.error('No valid beams in {!s}'.format(psf_filename))
return None, np.arange(nchan)
## Fill all flagged channels with the largest/first valid beam
dummybeam = allbeams['beams']['*'+str(chanid)]['*0']
for ii in range(0, nchan):
if weight[ii]==False:
image.setrestoringbeam( major=dummybeam['major'], minor=dummybeam['minor'], pa=dummybeam['positionangle'], channel=ii)
## Need to close and reopen for commonbeam() to see the new chan beams !
with casa_tools.ImageReader(psf_filename) as image:
## Recalculate common beam
newcommonbeam = image.commonbeam()
## Reinstate the old beam to get back to the original iter0 product
with casa_tools.ImageReader(psf_filename) as image:
for ii in range(0, nchan):
if weight[ii]==False:
beam = allbeams['beams']['*'+str(ii)]['*0']
image.setrestoringbeam( major=beam['major'], minor=beam['minor'], pa=beam['positionangle'], channel=ii )
return newcommonbeam, np.where(np.logical_not(weight))[0]