import numpy as np
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.utils as utils
import pipeline.domain.measures as measures
from pipeline.infrastructure import casa_tools
from .imageparams_base import ImageParamsHeuristics
LOG = infrastructure.get_logger(__name__)
[docs]class ImageParamsHeuristicsALMA(ImageParamsHeuristics):
def __init__(self, vislist, spw, observing_run, imagename_prefix='', proj_params=None, contfile=None,
linesfile=None, imaging_params={}):
ImageParamsHeuristics.__init__(self, vislist, spw, observing_run, imagename_prefix, proj_params, contfile,
linesfile, imaging_params)
self.imaging_mode = 'ALMA'
[docs] def robust(self):
"""robust parameter heuristic."""
if 'robust' in self.imaging_params:
robust = self.imaging_params['robust']
LOG.info('ALMA robust heuristics: Using imageprecheck value of robust=%.1f' % robust)
return robust
else:
return 0.5
[docs] def uvtaper(self, beam_natural=None, protect_long=3):
"""Adjustment of uvtaper parameter based on desired resolution or representative baseline length."""
# Disabled heuristic for ALMA Cycle 6
return []
if 'uvtaper' in self.imaging_params:
uvtaper = self.imaging_params['uvtaper']
LOG.info('ALMA uvtaper heuristics: Using imageprecheck value of uvtaper=%s' % (str(uvtaper)))
return uvtaper
if (beam_natural is None) and (protect_long is None):
return []
cqa = casa_tools.quanta
repr_target, repr_source, repr_spw, repr_freq, reprBW_mode, real_repr_target, minAcceptableAngResolution, maxAcceptableAngResolution, maxAllowedBeamAxialRatio, sensitivityGoal = self.representative_target()
# Protection against spurious long baselines
if protect_long is not None:
l80, min_diameter = self.calc_percentile_baseline_length(80.)
LOG.info('ALMA uvtaper heuristic: L80 baseline length is %.1f meter' % (l80))
c = cqa.getvalue(cqa.convert(cqa.constants('c'), 'm/s'))[0]
uvtaper_value = protect_long * l80 / cqa.getvalue(cqa.convert(cqa.constants('c'), 'm/s'))[0] * cqa.getvalue(cqa.convert(repr_freq, 'Hz'))[0]
uvtaper = ['%.2fklambda' % utils.round_half_up(uvtaper_value/1000., 2)]
return uvtaper
else:
return []
# # Original Cycle 5 heuristic follows below for possible later use.
# if not real_repr_target:
# LOG.info('ALMA uvtaper heuristic: No representative target found. Using uvtaper=[]')
# return []
#
# try:
# beam_natural_v = math.sqrt(cqa.getvalue(cqa.convert(beam_natural['major'], 'arcsec')) * cqa.getvalue(cqa.convert(beam_natural['minor'], 'arcsec')))
# except Exception as e:
# LOG.error('ALMA uvtaper heuristic: Cannot get natural beam size: %s. Using uvtaper=[]' % (e))
# return []
#
# minAR_v = cqa.getvalue(cqa.convert(minAcceptableAngResolution, 'arcsec'))
# maxAR_v = cqa.getvalue(cqa.convert(maxAcceptableAngResolution, 'arcsec'))
#
# if beam_natural_v < 1.1 * maxAR_v:
# beam_taper = math.sqrt(maxAR_v ** 2 - beam_natural_v ** 2)
# uvtaper = ['%.3garcsec' % (beam_taper)]
# else:
# uvtaper = []
#
# return uvtaper
[docs] def dr_correction(self, threshold, dirty_dynamic_range, residual_max, intent, tlimit):
"""Adjustment of cleaning threshold due to dynamic range limitations."""
qaTool = casa_tools.quanta
maxEDR_used = False
DR_correction_factor = 1.0
diameter = self.observing_run.get_measurement_sets()[0].antennas[0].diameter
old_threshold = qaTool.convert(threshold, 'Jy')['value']
if intent == 'TARGET' or intent == 'CHECK':
n_dr_max = 2.5
if diameter == 12.0:
if dirty_dynamic_range > 150.:
maxSciEDR = 150.0
new_threshold = max(n_dr_max * old_threshold, residual_max / maxSciEDR * tlimit)
LOG.info('DR heuristic: Applying maxSciEDR(Main array)=%s' % maxSciEDR)
maxEDR_used = True
else:
if dirty_dynamic_range > 100.:
n_dr = 2.5
elif 50. < dirty_dynamic_range <= 100.:
n_dr = 2.0
elif 20. < dirty_dynamic_range <= 50.:
n_dr = 1.5
elif dirty_dynamic_range <= 20.:
n_dr = 1.0
LOG.info('DR heuristic: N_DR=%s' % n_dr)
new_threshold = old_threshold * n_dr
else:
numberEBs = len(self.vislist)
if numberEBs == 1:
# single-EB 7m array datasets have limited dynamic range
maxSciEDR = 30
dirtyDRthreshold = 30
n_dr_max = 2.5
else:
# multi-EB 7m array datasets will have better dynamic range and can be cleaned somewhat deeper
maxSciEDR = 55
dirtyDRthreshold = 75
n_dr_max = 3.5
if dirty_dynamic_range > dirtyDRthreshold:
new_threshold = max(n_dr_max * old_threshold, residual_max / maxSciEDR * tlimit)
n_dr_effective = new_threshold / old_threshold
LOG.info('DR heuristic: Applying maxSciEDR(ACA)=%s (for %d EB) effective_N_DR=%.2f' %
(maxSciEDR, numberEBs, n_dr_effective))
maxEDR_used = True
else:
if dirty_dynamic_range > 40.:
n_dr = 3.0
elif dirty_dynamic_range > 20.:
n_dr = 2.5
elif 10. < dirty_dynamic_range <= 20.:
n_dr = 2.0
elif 4. < dirty_dynamic_range <= 10.:
n_dr = 1.5
elif dirty_dynamic_range <= 4.:
n_dr = 1.0
LOG.info('DR heuristic: N_DR=%s' % n_dr)
new_threshold = old_threshold * n_dr
else:
# Calibrators are usually dynamic range limited. The sensitivity from apparentsens
# is not a valid estimate for the threshold. Use a heuristic based on the dirty peak
# and some maximum expected dynamic range (EDR) values.
if diameter == 12.0:
maxCalEDR = 1000.0
veryHighCalEDR = 3000.0 # use shallower slope above this value
LOG.info('DR heuristic: Applying maxCalEDR=%s, veryHighCalEDR=%s' % (maxCalEDR, veryHighCalEDR))
matchPoint = veryHighCalEDR/maxCalEDR - 1 # will be 2 for 3000/1000
highDR = tlimit * residual_max / maxCalEDR / old_threshold # will use this value up to 3000
veryHighDR = matchPoint + tlimit * residual_max / veryHighCalEDR / old_threshold # will use this value above 3000
n_dr = max(1.0, min(highDR, veryHighDR))
LOG.info('DR heuristic: Calculating N_DR as max of (1.0, min of (%f, %f)) = %f'
'' % (highDR, veryHighDR, n_dr))
new_threshold = old_threshold * n_dr
else:
maxCalEDR = 200.0
LOG.info('DR heuristic: Applying maxCalEDR=%s' % maxCalEDR)
new_threshold = max(old_threshold, residual_max / maxCalEDR * tlimit)
if new_threshold != old_threshold:
maxEDR_used = True
if new_threshold != old_threshold:
LOG.info('DR heuristic: Modified threshold from %.3g Jy to %.3g Jy based on dirty dynamic range calculated'
' from dirty peak / final theoretical sensitivity: %.1f'
'' % (old_threshold, new_threshold, dirty_dynamic_range))
DR_correction_factor = new_threshold / old_threshold
return '%.3gJy' % (new_threshold), DR_correction_factor, maxEDR_used
[docs] def niter_correction(self, niter, cell, imsize, residual_max, threshold, residual_robust_rms, mask_frac_rad=0.0):
"""Adjustment of number of cleaning iterations due to mask size.
See base class method for parameter description."""
if mask_frac_rad == 0.0:
mask_frac_rad = 0.45 # ALMA specific parameter
return super().niter_correction(niter, cell, imsize, residual_max, threshold, residual_robust_rms,
mask_frac_rad=mask_frac_rad)
[docs] def calc_percentile_baseline_length(self, percentile):
"""Calculate percentile baseline length for the vis list used in this heuristics instance."""
min_diameter = 1.e9
percentileBaselineLengths = []
for msname in self.vislist:
ms_do = self.observing_run.get_ms(msname)
min_diameter = min(min_diameter, min([antenna.diameter for antenna in ms_do.antennas]))
percentileBaselineLengths.append(
np.percentile([float(baseline.length.to_units(measures.DistanceUnits.METRE))
for baseline in ms_do.antenna_array.baselines], percentile))
return np.median(percentileBaselineLengths), min_diameter
[docs] def get_autobox_params(self, iteration, intent, specmode, robust):
"""Default auto-boxing parameters for ALMA main array and ACA."""
# Start with generic defaults
sidelobethreshold = None
noisethreshold = None
lownoisethreshold = None
minbeamfrac = None
growiterations = None
dogrowprune = None
minpercentchange = None
repBaselineLength, min_diameter = self.calc_percentile_baseline_length(75.)
LOG.info('autobox heuristic: Representative baseline length is %.1f meter' % repBaselineLength)
baselineThreshold = 400
# PIPE-307
if min_diameter == 12.0 and repBaselineLength > baselineThreshold:
fastnoise = True
else:
fastnoise = False
if 'TARGET' in intent:
if min_diameter == 12.0:
if repBaselineLength < 300:
sidelobethreshold = 2.0
noisethreshold = 4.25
lownoisethreshold = 1.5
minbeamfrac = 0.3
dogrowprune = True
minpercentchange = 1.0
if specmode == 'cube':
negativethreshold = 15.0
growiterations = 50
else:
negativethreshold = 0.0
growiterations = 75
else:
if repBaselineLength < baselineThreshold:
sidelobethreshold = 2.0
else:
sidelobethreshold = 2.5 # was 3.0
noisethreshold = 5.0
lownoisethreshold = 1.5
minbeamfrac = 0.3
dogrowprune = True
minpercentchange = 1.0
if specmode == 'cube':
negativethreshold = 7.0
growiterations = 50
else:
negativethreshold = 0.0
growiterations = 75
elif min_diameter == 7.0:
sidelobethreshold = 1.25
noisethreshold = 5.0
lownoisethreshold = 2.0
minbeamfrac = 0.1
growiterations = 75
negativethreshold = 0.0
dogrowprune = True
minpercentchange = 1.0
elif 'CHECK' in intent:
if min_diameter == 12.0:
if repBaselineLength < 300:
sidelobethreshold = 2.0
noisethreshold = 4.25
lownoisethreshold = 1.5
minbeamfrac = 0.3
dogrowprune = True
minpercentchange = 1.0
if specmode == 'cube':
negativethreshold = 15.0
growiterations = 50
else:
negativethreshold = 0.0
growiterations = 75
else:
sidelobethreshold = 3.0
noisethreshold = 5.0
lownoisethreshold = 1.5
minbeamfrac = 0.3
dogrowprune = True
minpercentchange = 1.0
if specmode == 'cube':
negativethreshold = 7.0
growiterations = 50
else:
negativethreshold = 0.0
growiterations = 75
elif min_diameter == 7.0:
sidelobethreshold = 1.25
noisethreshold = 5.0
lownoisethreshold = 2.0
minbeamfrac = 0.1
growiterations = 75
negativethreshold = 0.0
dogrowprune = True
minpercentchange = 1.0
else:
if min_diameter == 12.0:
sidelobethreshold = 2.0
noisethreshold = 7.0
lownoisethreshold = 3.0
minbeamfrac = 0.1
growiterations = 75
negativethreshold = 0.0
dogrowprune = True
minpercentchange = 1.0
elif min_diameter == 7.0:
sidelobethreshold = 1.5
noisethreshold = 6.0
lownoisethreshold = 2.0
minbeamfrac = 0.1
growiterations = 75
negativethreshold = 0.0
dogrowprune = True
minpercentchange = 1.0
return sidelobethreshold, noisethreshold, lownoisethreshold, negativethreshold, minbeamfrac, growiterations, dogrowprune, minpercentchange, fastnoise
[docs] def warn_missing_cont_ranges(self):
return True
[docs] def nterms(self, spwspec):
return 2
[docs] def mosweight(self, intent, field):
if self.gridder(intent, field) == 'mosaic':
return True
else:
return False
[docs] def keep_iterating(self, iteration, hm_masking, tclean_stopcode, dirty_dynamic_range, residual_max,
residual_robust_rms, field, intent, spw, specmode):
"""Determine if another tclean iteration is necessary."""
if iteration == 0:
keep_iterating = True
hm_masking = hm_masking
else:
keep_iterating = False
# Check for zero automask
if (hm_masking == 'auto') and (tclean_stopcode == 7):
if intent in ('BANDPASS', 'PHASE', 'AMPLITUDE', 'POLARIZATION', 'POLANGLE', 'POLLEAKAGE'):
if residual_max / residual_robust_rms > 10.0:
LOG.warn('No automatic clean mask was found despite clean residual peak / scaled MAD > 10, '
'switched to pb-based mask and tlimit=4. '
'Field %s Intent %s SPW %s' % (field, intent, spw))
else:
LOG.warn('No automatic clean mask was found, switched to pb-based mask and tlimit=4. Field %s '
'Intent %s SPW %s' % (field, intent, spw))
# If no automask is found, always try the simple circular mask for calibrators
hm_masking = 'centralregion'
keep_iterating = True
elif intent in ('CHECK', 'TARGET'):
if residual_max / residual_robust_rms > 10.0:
if (specmode == 'cube') or (dirty_dynamic_range <= 30.0):
LOG.warn('No automatic clean mask was found despite clean residual peak / scaled MAD > 10, '
'check the results. '
'Field %s Intent %s SPW %s' % (field, intent, spw))
else:
LOG.warn('No automatic clean mask was found despite clean residual peak / scaled MAD > 10, '
'switched to pb-based mask and tlimit=4. '
'Field %s Intent %s SPW %s' % (field, intent, spw))
# If no automask is found, try the simple circular mask for high DR continuum
hm_masking = 'centralregion'
keep_iterating = True
return keep_iterating, hm_masking
[docs] def threshold(self, iteration, threshold, hm_masking):
if iteration == 0:
return '0.0mJy'
elif iteration == 1:
return threshold
else:
# Fallback to circular mask if auto-boxing fails.
# CAS-10489: old centralregion option needs higher threshold
cqa = casa_tools.quanta
return '%sJy' % (cqa.getvalue(cqa.mul(threshold, 2.0))[0])
[docs] def intent(self):
return 'TARGET'