"""
Created on 01 Jun 2017
@author: Vincent Geers (UKATC)
"""
import functools
import os.path
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.common.displays import applycal as applycal_displays
from pipeline.h.tasks.flagging.flagdatasetter import FlagdataSetter
from pipeline.hif.tasks import applycal
from pipeline.hif.tasks import correctedampflag
from pipeline.hif.tasks import gaincal
from pipeline.hifa.heuristics.phasespwmap import get_spspec_to_spwid_map
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from .resultobjects import GfluxscaleflagResults
__all__ = [
'GfluxscaleflagInputs',
'GfluxscaleflagResults',
'Gfluxscaleflag'
]
LOG = infrastructure.get_logger(__name__)
[docs]@task_registry.set_equivalent_casa_task('hifa_gfluxscaleflag')
@task_registry.set_casa_commands_comment(
'This task calls hif_correctedampflag to evaluate flagging heuristics on the phase calibrator and flux calibrator, '
'looking for outlier visibility points by statistically examining the scalar difference of corrected amplitudes '
'minus model amplitudes, and flagging those outliers.'
)
class Gfluxscaleflag(basetask.StandardTaskTemplate):
Inputs = GfluxscaleflagInputs
[docs] def prepare(self):
inputs = self.inputs
# Initialize results.
result = GfluxscaleflagResults()
# Store the vis in the result
result.vis = inputs.vis
result.plots = dict()
# create a shortcut to the plotting function that pre-supplies the inputs and context
plot_fn = functools.partial(create_plots, inputs, inputs.context)
# Create back-up of flags.
LOG.info('Creating back-up of "pre-gfluxscaleflag" flagging state')
flag_backup_name_pregfsf = 'before_gfsflag'
task = casa_tasks.flagmanager(
vis=inputs.vis, mode='save', versionname=flag_backup_name_pregfsf)
self._executor.execute(task)
# Determine the parameters to use for the gaincal to create the
# phase-only caltable.
if inputs.ms.combine_spwmap:
phase_combine = 'spw'
phaseup_spwmap = inputs.ms.combine_spwmap
phase_interp = 'linearPD,linear'
# Note: at present, phaseupsolint is specified as a fixed
# value, defined in inputs. In the future, phaseupsolint may
# need to be set based on exposure times; if so, see discussion
# in CAS-10158 and logic in hifa.tasks.fluxscale.GcorFluxscale.
else:
phase_combine = ''
phaseup_spwmap = inputs.ms.phaseup_spwmap
phase_interp = None
# Create phase caltable and merge it into the local context.
LOG.info('Compute phase gaincal table.')
self._do_gaincal(
intent=inputs.intent, gaintype='G', calmode='p',
combine=phase_combine, solint=inputs.phaseupsolint,
minsnr=inputs.minsnr, refant=inputs.refant,
spwmap=phaseup_spwmap, interp=phase_interp,
merge=True)
# Create amplitude caltable and merge it into the local context.
# CAS-10491: for scan-based (solint='inf') amplitude solves that
# will be applied to the calibrator, set interp to 'nearest'.
LOG.info('Compute amplitude gaincal table.')
if inputs.solint == 'inf':
self._do_gaincal(
intent=inputs.intent, gaintype='T', calmode='a',
combine='', solint=inputs.solint,
minsnr=inputs.minsnr, refant=inputs.refant,
interp='nearest,linear', merge=True)
else:
self._do_gaincal(
intent=inputs.intent, gaintype='T', calmode='a',
combine='', solint=inputs.solint,
minsnr=inputs.minsnr, refant=inputs.refant,
interp='linear,linear', merge=True)
# Ensure that any flagging applied to the MS by this applycal are
# reverted at the end, even in the case of exceptions.
try:
# Apply the new caltables to the MS.
LOG.info('Applying phase-up, bandpass, and amplitude cal tables.')
# Apply the calibrations.
callib_map = self._do_applycal(merge=False)
# copy across the vis:callibrary dict to our result. This dict
# will be inspected by the renderer to know if/which callibrary
# files should be copied across to the weblog stage directory
result.callib_map.update(callib_map)
# Make "after calibration, before flagging" plots for the weblog
LOG.info('Creating "after calibration, before flagging" plots')
result.plots['before'] = plot_fn(inputs.intent, suffix='before')
# Run correctedampflag to identify outliers for intents specified in
# intents_for_flagging; let "field" and "spw" be initialized
# automatically based on intents and context.
LOG.info('Running correctedampflag to identify outliers to flag.')
cafinputs = correctedampflag.Correctedampflag.Inputs(
context=inputs.context, vis=inputs.vis, intent=inputs.intent,
antnegsig=inputs.antnegsig, antpossig=inputs.antpossig,
tmantint=inputs.tmantint, tmint=inputs.tmint, tmbl=inputs.tmbl,
antblnegsig=inputs.antblnegsig, antblpossig=inputs.antblpossig,
relaxed_factor=inputs.relaxed_factor, niter=inputs.niter)
caftask = correctedampflag.Correctedampflag(cafinputs)
cafresult = self._executor.execute(caftask)
# If flags were found in the bandpass calibrator...
cafflags = cafresult.flagcmds()
if cafflags:
# Create the "after calibration, after flagging" plots for the weblog.
LOG.info('Creating "after calibration, after flagging" plots')
result.plots['after'] = plot_fn(inputs.intent, suffix='after')
finally:
# Restore the "pre-gfluxscaleflag" backup of the flagging state, to
# undo any flags that were propagated from caltables to the MS by
# the applycal call.
LOG.info('Restoring back-up of "pre-gfluxscaleflag" flagging state.')
task = casa_tasks.flagmanager(
vis=inputs.vis, mode='restore', versionname=flag_backup_name_pregfsf)
self._executor.execute(task)
# If new outliers were identified...
if cafflags:
# Re-apply the newly found flags from correctedampflag.
LOG.info('Re-applying flags from correctedampflag.')
fsinputs = FlagdataSetter.Inputs(
context=inputs.context, vis=inputs.vis, table=inputs.vis,
inpfile=[])
fstask = FlagdataSetter(fsinputs)
fstask.flags_to_set(cafflags)
fsresult = self._executor.execute(fstask)
# Store flagging task result.
result.cafresult = cafresult
return result
[docs] def analyse(self, result):
return result
def _do_applycal(self, merge):
inputs = self.inputs
# SJW - always just one job
ac_intents = [inputs.intent]
applycal_tasks = []
for intent in ac_intents:
task_inputs = applycal.IFApplycalInputs(inputs.context, vis=inputs.vis, intent=intent, flagsum=False,
flagbackup=False)
task = applycal.IFApplycal(task_inputs)
applycal_tasks.append(task)
callib_map = {}
for task in applycal_tasks:
applycal_result = self._executor.execute(task, merge=merge)
callib_map.update(applycal_result.callib_map)
return callib_map
def _do_gaincal(self, caltable=None, intent=None, gaintype='G',
calmode=None, combine=None, solint=None, antenna=None,
uvrange='', minsnr=None, refant=None, minblperant=None,
spwmap=None, interp=None, append=None, merge=True):
inputs = self.inputs
ms = inputs.ms
# Get the science spws
request_spws = ms.get_spectral_windows(task_arg=inputs.spw)
targeted_scans = ms.get_scans(scan_intent=intent, spw=inputs.spw)
# boil it down to just the valid spws for these fields and request
scan_spws = {spw for scan in targeted_scans for spw in scan.spws if spw in request_spws}
for spectral_spec, tuning_spw_ids in get_spspec_to_spwid_map(scan_spws).items():
tuning_spw_str = ','.join([str(i) for i in sorted(tuning_spw_ids)])
LOG.info('Processing spectral spec {}, spws {}'.format(spectral_spec, tuning_spw_str))
scans_with_data = ms.get_scans(spw=tuning_spw_str, scan_intent=intent)
if not scans_with_data:
LOG.info('No data to process for spectral spec {}. Continuing...'.format(spectral_spec))
continue
# of the fields that we are about to process, does any field have
# multiple intents?
mixed_intents = False
fields_in_scans = {fld for scan in scans_with_data for fld in scan.fields}
singular_intents = frozenset(intent.split(','))
if len(singular_intents) > 1:
for field in fields_in_scans:
intents_to_scans = {si: ms.get_scans(scan_intent=si, field=field.id, spw=tuning_spw_str)
for si in singular_intents}
valid_intents = [k for k, v in intents_to_scans.items() if v]
if len(valid_intents) > 1:
mixed_intents = True
break
if mixed_intents and solint == 'inf':
# multiple items, one for each intent. Each intent will result
# in a separate job.
# Make sure data for the intent exists (PIPE-367)
intents_to_scans = {si: [scan for scan in scans_with_data if si in scan.intents]
for si in singular_intents}
valid_intents = [k for k, v in intents_to_scans.items() if v]
task_intents = valid_intents
else:
# one item, and hence one job, with 'PHASE,BANDPASS,...'
task_intents = [','.join(singular_intents)]
for intent in task_intents:
# Initialize gaincal inputs.
task_inputs = gaincal.GTypeGaincal.Inputs(
inputs.context,
vis=inputs.vis,
caltable=caltable,
intent=intent,
spw=tuning_spw_str,
solint=solint,
gaintype=gaintype,
calmode=calmode,
minsnr=minsnr,
combine=combine,
refant=refant,
antenna=antenna,
uvrange=uvrange,
minblperant=minblperant,
solnorm=False,
append=append)
# if we need to generate multiple caltables, make the caltable
# names unique by inserting the intent to prevent them overwriting
# each other
if len(task_intents) > 1:
root, ext = os.path.splitext(task_inputs.caltable)
task_inputs.caltable = '{!s}.{!s}{!s}'.format(root, intent, ext)
# Modify output table filename to append "prelim".
if task_inputs.caltable.endswith('.tbl'):
task_inputs.caltable = task_inputs.caltable[:-4] + '.prelim.tbl'
else:
task_inputs.caltable += '.prelim'
# Initialize and execute gaincal task.
task = gaincal.GTypeGaincal(task_inputs)
result = self._executor.execute(task)
# modify the result so that this caltable is only applied to
# the intent from which the calibration was derived
calapp_overrides = dict(intent=intent)
# Adjust the spw map if provided.
if spwmap:
calapp_overrides['spwmap'] = spwmap
# https://open-jira.nrao.edu/browse/PIPE-367?focusedCommentId=141097&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-141097
#
# '... then to apply this table you need the same spw map that
# is printed in the spwphaseup stage'
if combine == 'spw':
calapp_overrides['spwmap'] = ms.combine_spwmap
# Adjust the interp if provided.
if interp:
calapp_overrides['interp'] = interp
calapp = result.final[0]
modified = callibrary.copy_calapplication(calapp, **calapp_overrides)
result.pool[0] = modified
result.final[0] = modified
# If requested, merge the result...
if merge:
# Merge result to the local context
result.accept(inputs.context)
def create_plots(inputs, context, intents, suffix=''):
"""
Return amplitude vs time and amplitude vs UV distance plots for the given
intents.
:param inputs: pipeline inputs
:param context: pipeline context
:param intents: intents to plot
:param suffix: optional component to add to the plot filenames
:return: dict of (x axis type => str, [plots,...])
"""
# Exit early if weblog generation has been disabled
if basetask.DISABLE_WEBLOG:
return [], []
ms = inputs.ms
calto = callibrary.CalTo(vis=inputs.vis, spw=inputs.spw, field=inputs.field)
output_dir = context.output_dir
amp_uvdist_plots, amp_time_plots = [], []
for intent in intents.split(','):
amp_uvdist_plots.extend(
AmpVsXChart('uvdist', intent, context, output_dir, calto, suffix=suffix).plot())
amp_time_plots.extend(
AmpVsXChart('time', intent, context, output_dir, calto, suffix=suffix).plot())
return {
'uvdist': amp_uvdist_plots,
'time': amp_time_plots
}
class AmpVsXChart(applycal_displays.PlotmsFieldSpwComposite):
"""
Plotting class that creates an amplitude vs X plot for each field and spw,
where X is given as a constructor argument.
"""
def __init__(self, xaxis, intent, context, output_dir, calto, **overrides):
plot_args = {
'ydatacolumn': 'corrected',
'avgtime': '',
'avgscan': False,
'avgbaseline': False,
'avgchannel': '9000',
'coloraxis': 'corr',
'overwrite': True,
'plotrange': [0, 0, 0, 0]
}
plot_args.update(**overrides)
super(AmpVsXChart, self).__init__(context, output_dir, calto, xaxis=xaxis, yaxis='amp', intent=intent,
**plot_args)