import re
import numpy as np
import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.filenamer as filenamer
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from .imageparams_base import ImageParamsHeuristics
LOG = infrastructure.get_logger(__name__)
[docs]class ImageParamsHeuristicsVLA(ImageParamsHeuristics):
def __init__(self, vislist, spw, observing_run, imagename_prefix='', proj_params=None, contfile=None,
linesfile=None, imaging_params={}):
ImageParamsHeuristics.__init__(self, vislist, spw, observing_run, imagename_prefix, proj_params, contfile,
linesfile, imaging_params)
self.imaging_mode = 'VLA'
[docs] def robust(self):
"""See PIPE-680 and CASR-543"""
return 0.5
[docs] def uvtaper(self, beam_natural=None, protect_long=None):
return []
[docs] def uvrange(self, field=None, spwspec=None):
"""
Restrict uvrange in case of very extended emission.
If the amplitude of the shortest 5 per cent of the covered baselines
is more than 2 times that of the 50-55 per cent baselines, then exclude
the shortest 5 per cent of the baselines.
See PIPE-681 and CASR-543.
:param field:
:param spwspec:
:return: (None or string in the form of '> {x}klambda', where
{x}=0.05*max(baseline), baseline ratio)
"""
def get_mean_amplitude(vis, uvrange=None, axis='amplitude', field='', spw=None):
stat_arg = {'vis': vis, 'uvrange': uvrange, 'axis': axis,
'useflags': True, 'field':field, 'spw': spw,
'correlation': 'LL,RR'}
job = casa_tasks.visstat(**stat_arg)
stats = job.execute(dry_run=False) # returns stat in meter
# Get means of spectral windows with data in the selected uvrange
spws_means = [v['mean'] for (k,v) in stats.items() if np.isfinite(v['mean'])]
# Determine mean and 95% percentile
mean = np.mean(spws_means)
percentile_95 = np.percentile(spws_means, 95)
return (mean, percentile_95)
if not field:
field = ''
if spwspec:
spwids = sorted(set(spwspec.split(',')), key=int) # list
else:
spwids = self.spwids # set
#
qa = casa_tools.quanta
#
LOG.info('Computing uvrange heuristics for field="{:s}", spwsids={:s}'.format(field, ','.join([str(spw) for spw in spwids])))
# Can it be that more than one visibility (ms file) is used?
vis = self.vislist[0]
ms = self.observing_run.get_ms(vis)
# Determine the largest covered baseline in klambda. Assume that
# the maximum baseline is associated with the highest frequency spw.
light_speed = qa.getvalue(qa.convert(qa.constants('c'), 'm/s'))[0]
max_mean_freq_Hz = 0.0 # spw mean frequency in the highest frequency spw
real_spwids = []
for spwid in spwids:
real_spwid = self.observing_run.virtual2real_spw_id(spwid, ms)
spw = ms.get_spectral_window(real_spwid)
real_spwids.append(real_spwid)
mean_freq_Hz = spw.mean_frequency.to_units(measures.FrequencyUnits.HERTZ)
if float(mean_freq_Hz) > max_mean_freq_Hz:
max_mean_freq_Hz = float(mean_freq_Hz)
max_freq_spw = real_spwid
# List of real spws
real_spwids_str = ','.join([str(spw) for spw in real_spwids])
# Check for maximum frequency
if max_mean_freq_Hz == 0.0:
LOG.warn("Highest frequency spw and largest baseline cannot be determined for spwids={:s}. "
"Using default uvrange.".format(','.join([str(spw) for spw in spwids])))
return None, None
# Get max baseline
mean_wave_m = light_speed / max_mean_freq_Hz # in meter
job = casa_tasks.visstat(vis=vis, field=field, spw=str(max_freq_spw), axis='uvrange', useflags=True)
uv_stat = job.execute(dry_run=False) # returns stat in meter
max_bl = uv_stat['DATA_DESC_ID=%s' % max_freq_spw]['max'] / mean_wave_m
# Define bin for lowest 5% of baselines (in wavelength units)
uvll = 0.0
uvul = 0.05 * max_bl
uvrange_SBL = '{:0.1f}~{:0.1f}klambda'.format(uvll / 1000.0, uvul / 1000.0)
mean_SBL, p95_SBL = get_mean_amplitude(vis=vis, uvrange=uvrange_SBL, field=field, spw=real_spwids_str)
# Range for 50-55% bin
uvll = 0.5 * max_bl
uvul = 0.5 * max_bl + 0.05 * max_bl
uvrange_MBL = '{:0.1f}~{:0.1f}klambda'.format(uvll / 1000.0, uvul / 1000.0)
mean_MBL, p95_MBL = get_mean_amplitude(vis=vis, uvrange=uvrange_MBL, field=field, spw=real_spwids_str)
# Compare amplitudes and decide on return value
ratio = p95_SBL / mean_MBL
# Report results
LOG.info('Mean amplitude in uvrange bins: {:s} is {:0.2E}Jy, '
'{:s} is {:0.2E}Jy'.format(uvrange_SBL, mean_SBL, uvrange_MBL, mean_MBL))
LOG.info('95 percentile in uvrange bins: {:s} is {:0.2E}Jy, '
'{:s} is {:0.2E}Jy'.format(uvrange_SBL, p95_SBL, uvrange_MBL, p95_MBL))
LOG.info('Ratio between 95 percentile small baseline bin '
'and mean of middle baseline bin is {:0.2E}'.format(ratio))
if ratio > 2.0:
LOG.info('Selecting uvrange>{:0.1f}klambda to avoid very extended emission.'.format(0.05 * max_bl / 1000.0))
return ">{:0.1f}klambda".format(0.05 * max_bl / 1000.0), ratio
else:
# Use complete uvrange
return '>0.0klambda', ratio
[docs] def pblimits(self, pb):
"""
PB gain level at which to cut off normalizations (tclean parameter).
See PIPE-674 and CASR-543
"""
# pblimits used in pipeline tclean._do_iterative_imaging() method (eventually in cleanbox.py) for
# computing statistics on residual image products.
if (pb not in [None, '']):
pblimit_image, pblimit_cleanmask = super().pblimits(pb)
# used for setting CASA tclean task pblimit parameter in pipeline tclean.prepare() method
else:
pblimit_image = -0.1
pblimit_cleanmask = 0.3
return pblimit_image, pblimit_cleanmask
[docs] def nterms(self, spwspec):
"""
Determine nterms depending on the fractional bandwidth.
Returns 1 if the fractional bandwidth is < 10 per cent, 2 otherwise.
See PIPE-679 and CASR-543
"""
if spwspec is None:
return None
# Fractional bandwidth
fr_bandwidth = self.get_fractional_bandwidth(spwspec)
if (fr_bandwidth >= 0.1):
return 2
else:
return 1
[docs] def deconvolver(self, specmode, spwspec):
"""See PIPE-679 and CASR-543"""
return 'mtmfs'
[docs] def niter_correction(self, niter, cell, imsize, residual_max, threshold, residual_robust_rms, mask_frac_rad=0.0):
"""Adjustment of number of cleaning iterations due to mask size.
Uses residual_robust_rms instead threshold to compute the new niter value.
See PIPE-682 and CASR-543 and base class method for parameter description."""
if mask_frac_rad == 0.0:
# The motivation here is that while EVLA images can be large, only a small fraction of pixels
# will typically have emission (for continuum images).
mask_frac_rad = 0.05
# VLA specific threshold
threshold_vla = casa_tools.quanta.quantity(self.nsigma(0, None) * residual_robust_rms, 'Jy')
# Set allowed niter range
max_niter = 1000000
min_niter = 10000
# Compute new niter
new_niter = super().niter_correction(niter, cell, imsize, residual_max, threshold_vla, residual_robust_rms,
mask_frac_rad=mask_frac_rad)
# Apply limits
if new_niter < min_niter:
LOG.info('niter heuristic: Modified niter %d is smaller than lower limit (%d)' % (new_niter, min_niter))
new_niter = min_niter
elif new_niter > max_niter:
LOG.info('niter heuristic: Modified niter %d is larger than upper limit (%d)' % (new_niter, max_niter))
new_niter = max_niter
return new_niter
[docs] def specmode(self):
"""See PIPE-683 and CASR-543"""
return 'cont'
[docs] def nsigma(self, iteration, hm_nsigma):
"""See PIPE-678 and CASR-543"""
if hm_nsigma:
return hm_nsigma
else:
return 5.0
[docs] def threshold(self, iteration, threshold, hm_masking):
"""See PIPE-678 and CASR-543"""
if iteration == 0 or hm_masking in ['none']:
return '0.0mJy'
else:
return threshold
[docs] def imsize(self, fields, cell, primary_beam, sfpblimit=None, max_pixels=None,
centreonly=False, vislist=None, spwspec=None):
"""
Image size heuristics for single fields and mosaics. The pixel count along x and y image dimensions
is determined by the cell size, primary beam size and the spread of phase centers in case of mosaics.
Frequency dependent image size may be computed for VLA imaging.
For single fields, 18 GHz and above FOV extends to the first minimum of the primary beam Airy pattern.
Below 18 GHz, FOV extends to the second minimum (incorporating the first sidelobes).
See PIPE-675 and CASR-543
:param fields: list of comma separated strings of field IDs per MS.
:param cell: pixel (cell) size in arcsec.
:param primary_beam: primary beam width in arcsec.
:param sfpblimit: single field primary beam response. If provided then imsize is chosen such that the image
edge is at normalised primary beam level equals to sfpblimit.
:param max_pixels: maximum allowed pixel count, integer. The same limit is applied along both image axes.
:param centreonly: if True, then ignore the spread of field centers.
:param vislist: list of visibility path string to be used for imaging. If not set then use all visibilities
in the context.
:param spwspec: ID list of spectral windows used to create image product. List or string containing comma
separated spw IDs list.
:return: two element list of pixel count along x and y image axes.
"""
if spwspec is not None:
if type(spwspec) is not str:
spwspec = ",".join(spwspec)
freq_limits = self.get_min_max_freq(spwspec)
# 18 GHz and above (K, Ka, Q VLA bands)
if freq_limits['abs_min_freq'] >= 1.79e10:
# equivalent to first minimum of the Airy diffraction pattern; m = 1.22.
sfpblimit = 0.294
else:
# equivalent to second minimum of the Airy diffraction pattern; m = 2.233 in theta = m*lambda/D
sfpblimit = 0.016
return super().imsize(fields, cell, primary_beam, sfpblimit=sfpblimit, max_pixels=max_pixels,
centreonly=centreonly, vislist=vislist)
[docs] def imagename(self, output_dir=None, intent=None, field=None, spwspec=None, specmode=None, band=None):
try:
nameroot = self.imagename_prefix
if nameroot == 'unknown':
nameroot = 'oussid'
# need to sanitize the nameroot here because when it's added
# to filenamer as an asdm, os.path.basename is run on it with
# undesirable results.
nameroot = filenamer.sanitize(nameroot)
except:
nameroot = 'oussid'
namer = filenamer.Image()
namer._associations.asdm(nameroot)
if output_dir:
namer.output_dir(output_dir)
namer.stage('STAGENUMBER')
if intent:
namer.intent(intent)
if field:
namer.source(field)
if specmode != 'cont' and spwspec:
# find all the spwids present in the list
p = re.compile(r"[ ,]+(\d+)")
spwids = p.findall(' %s' % spwspec)
spwids = list(set(spwids))
spw = '_'.join(map(str, sorted(map(int, spwids))))
namer.spectral_window(spw)
if specmode == 'cont' and band:
namer.band('{}_band'.format(band))
if specmode:
namer.specmode(specmode)
# filenamer returns a sanitized filename (i.e. one with
# illegal characters replace by '_'), no need to check
# the name components individually.
imagename = namer.get_filename()
return imagename