import inspect
import numpy as np
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.common import commonhelpermethods
from pipeline.h.tasks.common import viewflaggers
from pipeline.hifa.tasks import bandpass
from pipeline.infrastructure import task_registry
from . import wvrgcalflagsetter
from .resultobjects import WvrgcalflagResults, WvrgcalflagViewResults
from ..wvrgcal import wvrgcal
__all__ = [
'Wvrgcalflag',
'WvrgcalflagInputs'
]
LOG = infrastructure.get_logger(__name__)
[docs]@task_registry.set_equivalent_casa_task('hifa_wvrgcalflag')
@task_registry.set_casa_commands_comment(
'Water vapour radiometer corrections are calculated for each antenna. The quality of the correction is assessed by '
'comparing a phase gain solution calculated with and without the WVR correction. This requires calculation of a '
'temporary phase gain on the bandpass calibrator, a temporary bandpass using that temporary gain, followed by phase'
' gains with the temporary bandpass, with and without the WVR correction. After that, some antennas are wvrflagged '
'(so that their WVR corrections are interpolated), and then the quality of the correction recalculated.'
)
class Wvrgcalflag(basetask.StandardTaskTemplate):
Inputs = WvrgcalflagInputs
[docs] def prepare(self):
inputs = self.inputs
# Identify number and fraction of antennas with WVR.
nr_ants_wvr, frac_ants_wvr = self._identify_ants_with_wvr()
# Evaluate whether there are enough antennas with WVR by number and
# fraction to be worth proceeding with calibration and flagging.
if (nr_ants_wvr < inputs.ants_with_wvr_nr_thresh or
frac_ants_wvr < self.inputs.ants_with_wvr_thresh):
# If there are too few antennas, return an empty result, with
# flag set to indicate that there were too few WVR antennas.
flaggerresult = viewflaggers.MatrixFlaggerResults(vis=inputs.vis)
result = WvrgcalflagResults(vis=inputs.vis,
flaggerresult=flaggerresult,
too_few_wvr=True)
# Log info message.
if nr_ants_wvr < inputs.ants_with_wvr_nr_thresh:
LOG.info("Number of antennas with WVR ({}) below the"
" threshold ({}), skipping WVR calibration."
"".format(nr_ants_wvr, inputs.ants_with_wvr_nr_thresh))
elif frac_ants_wvr < self.inputs.ants_with_wvr_thresh:
LOG.info("Fraction of antennas with WVR ({:.2f}) below the"
" threshold of {}, skipping WVR calibration."
"".format(frac_ants_wvr, self.inputs.ants_with_wvr_thresh))
else:
# If there are enough WVR antennas initially, run calibration
# and flagging.
flaggerresult = self._run_flagger()
# Attach flagging result to final result.
result = WvrgcalflagResults(vis=inputs.vis,
flaggerresult=flaggerresult)
return result
[docs] def analyse(self, result):
inputs = self.inputs
# If flagging views are available, then evaluate the flagger result to
# check whether the WVR correction should still be applied.
if result.flaggerresult.viewresult and result.flaggerresult.viewresult.descriptions():
# Identify number and fraction of unflagged antennas with WVR.
nr_ants_wvr, frac_ants_wvr = self._identify_unflagged_ants_with_wvr(result)
# If too few unflagged antennas with WVR remain (by number or
# fraction), then remove the calfile from the result so that it
# cannot be accepted into the context, and set flag in result
# to indicate too few WVR antennas remained.
if (nr_ants_wvr < inputs.ants_with_wvr_nr_thresh or
frac_ants_wvr < inputs.ants_with_wvr_thresh):
result.flaggerresult.dataresult.final = []
result.too_few_wvr_post_flagging = True
# Log warning.
if nr_ants_wvr < inputs.ants_with_wvr_nr_thresh:
LOG.warning(
"After flagging, the number of unflagged antennas"
" with WVR ({}) is below the threshold of {}, skipping"
" WVR calibration.".format(nr_ants_wvr, inputs.ants_with_wvr_nr_thresh))
elif frac_ants_wvr < inputs.ants_with_wvr_thresh:
LOG.warning(
"After flagging, the fraction of unflagged antennas"
" with WVR ({:.2f}) is below the threshold of {};"
" wvrgcal file will not be applied."
"".format(frac_ants_wvr, inputs.ants_with_wvr_thresh))
# If the associated qa score indicates that applying the WVR
# calibration will make things worse then remove the calfile from
# the result so that it cannot be accepted into the context.
elif result.flaggerresult.dataresult.qa_wvr.overall_score is not None \
and result.flaggerresult.dataresult.qa_wvr.overall_score < inputs.accept_threshold:
LOG.warning('wvrgcal file has qa score ({0}) below'
' accept_threshold ({1}) and will not be'
' applied'.format(result.flaggerresult.dataresult.qa_wvr.overall_score,
inputs.accept_threshold))
result.flaggerresult.dataresult.final = []
return result
def _identify_ants_with_wvr(self):
"""
Identify antennas with WVR, return as number and as fraction of all
antennas.
"""
# Get the MS domain object.
ms = self.inputs.context.observing_run.get_ms(name=self.inputs.vis)
# Identify antennas with WVR as those whose name does not start with 'CM'.
ants_with_wvr = [antenna.name
for antenna in ms.antennas
if not antenna.name.startswith('CM')]
nr_ants_wvr = len(ants_with_wvr)
frac_ants_wvr = nr_ants_wvr / float(len(ms.antennas))
return nr_ants_wvr, frac_ants_wvr
def _identify_unflagged_ants_with_wvr(self, result):
"""
Evaluate flagger result to identify remaining unflagged antennas with
WVR, return as a number and as a fraction of all antennas.
:param result: WvrgcalflagResults object
:return: (int, float): number of antennas, fraction of antennas.
"""
# Get the MS domain object.
ms = self.inputs.context.observing_run.get_ms(name=self.inputs.vis)
# Only expect 1 flagging view for Wvrgcalflag.
view = result.flaggerresult.last(result.flaggerresult.descriptions()[0])
# Identify antennas fully flagged for all timestamps, mapping the
# array indices to the original antenna IDs using the flagging view
# x-axis data.
antids_fully_flagged = view.axes[0].data[
np.where(np.all(view.flag, axis=1))[0]]
# Identify antennas with WVR that also remain unflagged.
ants_with_wvr_and_unflagged = [
antenna.name
for antenna in ms.antennas
if not antenna.name.startswith('CM')
and antenna.id not in antids_fully_flagged
]
nr_ants_wvr = len(ants_with_wvr_and_unflagged)
frac_ants_wvr = nr_ants_wvr / float(len(ms.antennas))
return nr_ants_wvr, frac_ants_wvr
def _run_flagger(self):
inputs = self.inputs
# Construct the task that will prepare/read the data necessary to
# create a flagging view.
datainputs = WvrgcalflagData.Inputs(
context=inputs.context, output_dir=inputs.output_dir,
vis=inputs.vis, caltable=inputs.caltable,
offsetstable=inputs.offsetstable,
hm_toffset=inputs.hm_toffset, toffset=inputs.toffset,
segsource=inputs.segsource, hm_tie=inputs.hm_tie,
tie=inputs.tie, sourceflag=inputs.sourceflag, nsol=inputs.nsol,
disperse=inputs.disperse, wvrflag=inputs.wvrflag,
hm_smooth=inputs.hm_smooth, smooth=inputs.smooth,
scale=inputs.scale, maxdistm=inputs.maxdistm,
minnumants=inputs.minnumants, mingoodfrac=inputs.mingoodfrac,
refant=inputs.refant, qa_intent=inputs.qa_intent,
qa_bandpass_intent=inputs.qa_bandpass_intent)
datatask = WvrgcalflagData(datainputs)
# Construct the generator that will create the views of the data
# that is the basis for flagging.
viewtask = WvrgcalflagView(context=inputs.context,
flag_intent=inputs.flag_intent)
# Construct the "flagsetter" task: Wvrgcalflag has its own dedicated
# flagsetter that does not really set flags in underlying data, but
# rather updates the "pre-existing flags" parameter (wvrflag) of the
# datatask, such that a subsequent run of the datatask takes the newly
# found flags into account.
flagsetterinputs = wvrgcalflagsetter.WvrgcalFlagSetterInputs(
context=inputs.context, table=inputs.vis, vis=inputs.vis,
datatask=datatask)
flagsettertask = wvrgcalflagsetter.WvrgcalFlagSetter(flagsetterinputs)
# Define which type of flagger to use.
flagger = viewflaggers.MatrixFlagger
# Translate the input flagging parameters to a more compact
# list of rules.
rules = flagger.make_flag_rules(
flag_hi=inputs.flag_hi,
fhi_limit=inputs.fhi_limit,
fhi_minsample=inputs.fhi_minsample)
# Construct the flagger task around the data, view, and flagsetter
# tasks. Wvrgcalflag passes its own Results class to be used by the
# flagger, and the flagger produces the final result.
flaggerinputs = flagger.Inputs(
context=inputs.context, output_dir=inputs.output_dir,
vis=inputs.vis, datatask=datatask, viewtask=viewtask,
flagsettertask=flagsettertask, rules=rules, niter=1,
iter_datatask=True)
flaggertask = flagger(flaggerinputs)
# Execute the flagger task, and wrap this child task in a
# SuspendCapturingLogger so that warnings emitted by the child task
# do not make it to the web log page; see CAS-7795.
with logging.SuspendCapturingLogger():
flaggerresult = self._executor.execute(flaggertask)
return flaggerresult
class WvrgcalflagDataInputs(vdp.StandardInputs):
def __init__(self, context, output_dir=None, vis=None, caltable=None, offsetstable=None, hm_toffset=None,
toffset=None, segsource=None, hm_tie=None, tie=None, sourceflag=None, nsol=None, disperse=None,
wvrflag=None, hm_smooth=None, smooth=None, scale=None, maxdistm=None, minnumants=None,
mingoodfrac=None, refant=None, qa_intent=None, qa_bandpass_intent=None):
# pipeline inputs
self.context = context
# vis must be set first, as other properties may depend on it
self.vis = vis
self.output_dir = output_dir
# data selection arguments
self.caltable = caltable
self.offsetstable = offsetstable
# solution parameters
self.hm_toffset = hm_toffset
self.toffset = toffset
self.segsource = segsource
self.hm_tie = hm_tie
self.tie = tie
self.sourceflag = sourceflag
self.nsol = nsol
self.disperse = disperse
self.wvrflag = wvrflag
self.hm_smooth = hm_smooth
self.smooth = smooth
self.scale = scale
self.maxdistm = maxdistm
self.minnumants = minnumants
self.mingoodfrac = mingoodfrac
self.refant = refant
self.qa_intent = qa_intent
self.qa_bandpass_intent = qa_bandpass_intent
# WvrgcalflagDataInputs is a worker task for Wvrgcalflag and does not need
# to define its own defaults for input parameters. However, as a result, its
# input parameters will not show up in the "inputs as a dictionary" object.
# The weblog renderer for Wvrgcalflag creates plots based on the result
# from WvrgcalflagData, and the renderer needs to access attributes from
# its inputs (expecting a dictionary).
# To ensure all parameters from WvrgcalflagData show up in its final inputs
# as a dictionary, we override "as_dict" here.
# TODO: could this override be removed after initializing all parameters
# TODO: as vdp parameters with default = None?
def as_dict(self):
skip = ['context', 'ms']
return {dd_name: dd
for dd_name, dd in inspect.getmembers(self, lambda a: not(inspect.isroutine(a)))
if not (dd_name.startswith('_') or dd_name in skip)}
class WvrgcalflagData(basetask.StandardTaskTemplate):
Inputs = WvrgcalflagDataInputs
def __init__(self, inputs):
super(WvrgcalflagData, self).__init__(inputs)
# Initialize parameters that should persist across multiple flagging
# iterations. These will be populated by the output from Wvrgcal on
# the first iteration, and are then passed on to Wvrgcal on subsequent
# iterations.
self.bandpass_result = None
self.nowvr_result = None
self.qa_spw = ''
def prepare(self):
inputs = self.inputs
# Calculate the wvrgcal:
# * set accept_threshold to zero to ensure that the Wvrgcal result
# is always accepted into the local copy of the context, to force
# trigger the QA scoring determination.
# * pass along locally cached values for "bandpass_result",
# "nowvr_result": if these are not empty, then they were
# populated by a previous flagging iteration, and Wvrgcal will
# avoid running an exact repeat of the same task
# * pass along locally cached value for "qa_spw" to ensure that
# Wvrgcal will use the same spw list on subsequent iterations.
wvrgcalinputs = wvrgcal.Wvrgcal.Inputs(
context=inputs.context, output_dir=inputs.output_dir,
vis=inputs.vis, offsetstable=inputs.offsetstable,
hm_toffset=inputs.hm_toffset, toffset=inputs.toffset,
segsource=inputs.segsource, hm_tie=inputs.hm_tie, tie=inputs.tie,
sourceflag=inputs.sourceflag, nsol=inputs.nsol,
disperse=inputs.disperse, wvrflag=inputs.wvrflag,
hm_smooth=inputs.hm_smooth, smooth=inputs.smooth,
scale=inputs.scale, maxdistm=inputs.maxdistm,
minnumants=inputs.minnumants, mingoodfrac=inputs.mingoodfrac,
refant=inputs.refant, qa_intent=inputs.qa_intent,
qa_bandpass_intent=inputs.qa_bandpass_intent,
accept_threshold=0.0, bandpass_result=self.bandpass_result,
nowvr_result=self.nowvr_result, qa_spw=self.qa_spw)
wvrgcaltask = wvrgcal.Wvrgcal(wvrgcalinputs)
result = self._executor.execute(wvrgcaltask, merge=True)
# Cache bandpass result, nowvr result, and qa_spw for next call to
# wvrgcal.
self.bandpass_result = result.qa_wvr.bandpass_result
self.nowvr_result = result.qa_wvr.nowvr_result
self.qa_spw = result.qa_wvr.qa_spw
# Add a top-level reference to the "No WVR" result, expected by WVR
# renderer.
result.nowvr_result = result.qa_wvr.nowvr_result
return result
def analyse(self, result):
return result
class WvrgcalflagView(object):
def __init__(self, context, flag_intent=None):
"""
Creates an WvrgcalflagView instance.
"""
self.context = context
self.flag_intent = flag_intent
def __call__(self, dataresult):
"""
When called, the WvrgcalflagView object calculates flagging views
based on the provided dataresult (from Wvrgcal). For the views matching
the intent specified by "inputs.flag_intent", it retrieves the view,
updates the flags in this view to reflect the flagging found by
Wvrgcal, and then stores the resulting view in WvrgcalflagViewResults.
dataresult -- WvrgcalResult object.
Returns:
WvrgcalflagViewResults object containing the flagging view.
"""
# Initialize result structure.
result = WvrgcalflagViewResults(vis=dataresult.vis)
# As part of the data task, Wvrgcal will have created flagging views,
# Wvrgcalflag will start from a copy of the views stored in dataresult.
LOG.info('Retrieving flagging views for vis {0}'.format(
dataresult.vis))
# Get a list of available intents from the MS.
ms = self.context.observing_run.get_ms(name=dataresult.vis)
ms_intent_list = ms.intents
# Test if the flagging intents are present in the MS.
intent_available = []
flag_intent_list = self.flag_intent.split(',')
for flag_intent in flag_intent_list:
intent_available = [intent for intent in ms_intent_list if flag_intent in intent]
if intent_available:
LOG.info('flagging views will use %s data' % flag_intent)
else:
LOG.warning('no data for intent %s' % flag_intent)
if not intent_available:
LOG.warning('no data fits flag_intent {0}, no flagging will be'
' done'.format(self.flag_intent))
# From the QA section of the dataresult (from Wvrgcal) copy those views
# for which the intent matches the flag_intent.
for description in dataresult.qa_wvr.descriptions():
for flag_intent in flag_intent_list:
if 'Intent:%s' % flag_intent in description:
# Translate list of WVR flagged antenna names back to list
# of antenna IDs.
ant_names, ant_ids = commonhelpermethods.get_antenna_names(ms)
wvrflagids = []
for ant_name in dataresult.wvrflag:
ant_id = [idx for idx in ant_names if
ant_names[idx] == ant_name]
wvrflagids += ant_id
# Set antennas specified by the wvrflag parameter in the
# flagging image to show that these data are already 'flagged'
# (i.e. interpolated).
image = dataresult.qa_wvr.last(description)
image.setflags(axisname='Antenna', indices=wvrflagids)
# Add updated view to result.
result.addview(description, image)
return result