"""
The flagdeterbase module provides base classes for deterministic flagging
tasks in the pipeline.
To test these classes, register some data with the pipeline using ImportData,
then execute:
import pipeline
vis = [ '<MS name>' ]
# Create a pipeline context and register some data
context = pipeline.Pipeline().context
inputs = pipeline.tasks.ImportData.Inputs(context, vis=vis)
task = pipeline.tasks.ImportData(inputs)
results = task.execute(dry_run=False)
results.accept(context)
# Execute the flagging task
inputs = pipeline.tasks.flagging.FlagDeterBase.Inputs(context,\
autocorr=True, shadow=True, scan=True, scannumber='4,5,8',\
intents='*AMPLI*', edgespw=True, fracspw=0.1)
task = pipeline.tasks.flagging.FlagDeterBase(inputs)
result = task.execute(dry_run=True)
In other words, create a context, create the inputs (which sets the public
variables to the correct values and creates the temporary flag command file),
convert the class arguments to arguments to the CASA task flagdata), create
the FlagDeterBase() instance, perform FlagDeterBase.analyse(), and execute the
class.
"""
import os
import string
from casatasks.private import flaghelper
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tasks
import pipeline.infrastructure.utils as utils
__all__ = [
'FlagDeterBase',
'FlagDeterBaseInputs',
'FlagDeterBaseResults'
]
LOG = infrastructure.get_logger(__name__)
[docs]class FlagDeterBaseResults(basetask.Results):
def __init__(self, summaries, flagcmds):
super(FlagDeterBaseResults, self).__init__()
self.summaries = summaries
self._flagcmds = flagcmds
[docs] def flagcmds(self):
return self._flagcmds
[docs] def merge_with_context(self, context):
# nothing to do
pass
def __repr__(self):
# Step through the summary list and print a few things.
# SUBTRACT flag counts from previous agents, because the counts are
# cumulative.
s = 'Deterministic flagging results:\n'
for idx in range(0, len(self.summaries)):
flagcount = int(self.summaries[idx]['flagged'])
totalcount = int(self.summaries[idx]['total'])
# From the second summary onwards, subtract counts from the previous
# one
if idx > 0:
flagcount = flagcount - int(self.summaries[idx-1]['flagged'])
s += '\tSummary %s (%s) : Flagged : %s out of %s (%0.2f%%)\n' % (
idx, self.summaries[idx]['name'], flagcount, totalcount,
100.0*flagcount/totalcount)
return s
[docs]class FlagDeterBase(basetask.StandardTaskTemplate):
"""
FlagDeterBase is the base class for deterministic flagging. It can perform
many different types of deterministic flagging:
- Autocorrelations
- Shadowed antennas
- Scan and intents
- Edge channels
- Online flags
- Template flags
FlagDeterBase outputs flagdata flagging commands to a temporary ASCII
file located in the pipeline working directory; flagdata is then invoked
using this command file as input.
"""
# link the accompanying inputs to this task
Inputs = FlagDeterBaseInputs
[docs] def prepare(self):
"""
Prepare and execute a flagdata flagging job appropriate to the
task inputs.
This method generates, overwriting if necessary, an ASCII file
containing flagdata flagging commands. A flagdata task is then
executed, using this ASCII file as inputs.
"""
# create a local alias for inputs, so we're not saying 'self.inputs'
# everywhere
inputs = self.inputs
# get the flagdata command string, ready for the flagdata input file
flag_cmds = self._get_flag_commands()
flag_str = '\n'.join(flag_cmds)
# write the flag commands to the file
with open(inputs.inpfile, 'w') as stream:
stream.writelines(flag_str)
# to save inspecting the file, also log the flag commands
LOG.debug('Flag commands for %s:\n%s', inputs.vis, flag_str)
# Map the pipeline inputs to a dictionary of CASA task arguments
task_args = inputs.to_casa_args()
# create and execute a flagdata job using these task arguments
job = casa_tasks.flagdata(**task_args)
summary_dict = self._executor.execute(job)
agent_summaries = dict((v['name'], v) for v in summary_dict.values())
ordered_agents = ['before', 'anos', 'intents', 'qa0', 'qa2', 'online', 'template', 'autocorr',
'shadow', 'pointing', 'edgespw', 'clip', 'quack',
'baseband']
summary_reps = [agent_summaries[agent]
for agent in ordered_agents
if agent in agent_summaries]
# return the results object, which will be used for the weblog
return FlagDeterBaseResults(summary_reps, flag_cmds)
[docs] def analyse(self, results):
"""
Analyse the results of the flagging operation.
This method does not perform any analysis, so the results object is
returned exactly as-is, with no data massaging or results items
added. If additional statistics needed to be calculated based on the
post-flagging state, this would be a good place to do it.
:rtype: :class:~`FlagDeterBaseResults`
"""
return results
def _get_flag_commands(self):
"""
Get the flagging commands as a string suitable for flagdata.
This method analyses the inputs associated with this instance, parsing
the input parameters and converting them into a list of matching
flagdata flagging commands. This list of commands is then converted
to one unified string, which can be written to a file and used as
input for flagdata.
:rtype: a string
"""
# create a local variable for the inputs associated with this instance
inputs = self.inputs
# the empty list which will hold the flagging commands
flag_cmds = []
# flag online? TODO: clean this section?
'''
if inputs.online:
if not os.path.exists(inputs.fileonline):
LOG.warning('Online flag file \'%s\' was not found. Online '
'flagging for %s disabled.' % (inputs.fileonline,
inputs.ms.basename))
else:
flag_cmds.extend(self._read_flagfile(inputs.fileonline))
flag_cmds.append("mode='summary' name='online'")
'''
# These must be separated due to the way agent flagging works
if inputs.scan and inputs.intents != '':
# for intent in inputs.intents.split(','):
# if '*' not in intent:
# intent = '*%s*' % intent
# flag_cmds.append("mode='manual' intent='%s' reason='intents'" % intent)
# flag_cmds.append("mode='summary' name='intents'")
for intent in inputs.intents.split(','):
# if '*' not in intent:
# intent = '*%s*' % intent
intentlist = list(inputs.ms.get_original_intent(intent))
for intent_item in intentlist:
flag_cmds.append("mode='manual' intent='%s' reason='intents'" % intent_item)
flag_cmds.append("mode='summary' name='intents'")
if inputs.online:
if not os.path.exists(inputs.fileonline):
LOG.warning('Online flag file \'%s\' was not found. Online '
'flagging for %s disabled.' % (inputs.fileonline,
inputs.ms.basename))
else:
# QA0 / QA2 flag
if inputs.qa0 or inputs.qa2:
cmdlist = self._read_flagfile(inputs.fileonline)
# QA0 flag
if inputs.qa0:
flag_cmds.extend([cmd for cmd in cmdlist if ('QA0' in cmd)])
flag_cmds.append("mode='summary' name='qa0'")
# QA2 flag
if inputs.qa2:
flag_cmds.extend([cmd for cmd in cmdlist if ('QA2' in cmd)])
flag_cmds.append("mode='summary' name='qa2'")
# All other online flags
flag_cmds.extend([cmd for cmd in cmdlist if not ('QA0' in cmd) and not('QA2' in cmd)])
flag_cmds.append("mode='summary' name='online'")
else:
flag_cmds.extend(self._read_flagfile(inputs.fileonline))
flag_cmds.append("mode='summary' name='online'")
# flag template?
if inputs.template:
if not os.path.exists(inputs.filetemplate):
LOG.warning('Template flag file \'%s\' was not found. Template '
'flagging for %s disabled.' % (inputs.filetemplate,
inputs.ms.basename))
else:
flag_cmds.extend(self._read_flagfile(inputs.filetemplate))
flag_cmds.append("mode='summary' name='template'")
# Flag autocorrelations?
if inputs.autocorr:
flag_cmds.append("mode='manual' autocorr=True reason='autocorr'")
flag_cmds.append("mode='summary' name='autocorr'")
# Flag shadowed antennas?
if inputs.shadow:
flag_cmds.append("mode='shadow' tolerance=%s reason='shadow'" % inputs.tolerance)
flag_cmds.append("mode='summary' name='shadow'")
# Flag according to scan numbers and intents?
if inputs.scan and inputs.scannumber != '':
flag_cmds.append("mode='manual' scan='%s' reason='scans'" % inputs.scannumber)
flag_cmds.append("mode='summary' name='scans'")
# Flag spectral window edge channels?
if inputs.edgespw:
to_flag = self._get_edgespw_cmds()
if to_flag:
spw_arg = ','.join(to_flag)
flag_cmds.append("mode='manual' spw='%s' reason='edgespw'" % spw_arg)
flag_cmds.append("mode='summary' name='edgespw'")
# summarise the state before flagging rather than assuming the initial
# state is unflagged
if flag_cmds:
flag_cmds.insert(0, "mode='summary' name='before'")
return flag_cmds
def _get_autocorr_cmd(self):
# return "mode='manual' antenna='*&&&'"
return "mode='manual' autocorr=True"
[docs] def verify_spw(self, spw):
"""
Verify that the given spw should be flagged, raising a ValueError if
it should not.
Checks in this function should be generic. Observatory-dependent tests
should be added by extending FlagDeterBase and overriding this
method.
"""
# Get the data description for this spw
dd = self.inputs.ms.get_data_description(spw=spw)
if dd is None:
raise ValueError('Missing data description for spw %s ' % spw.id)
ncorr = len(dd.corr_axis)
if ncorr not in (1, 2, 4):
raise ValueError('Wrong number of correlations %s for spw %s '
'' % (ncorr, spw.id))
def _get_edgespw_cmds(self):
"""
Return flagging commands to flag the outermost X channels on each side
of each science spectral window, where X is computed from the total
number of channels in the spw, and the "fraction of channels to flag"
as returned by the "get_fracspw" function.
:return: list of flagging commands as strings.
:rtype: list[str]
"""
inputs = self.inputs
# to_flag is the list to which flagging commands will be appended
to_flag = []
# loop over the spectral windows, generate a flagging command for each
# spw in the ms. Calling get_spectral_windows() with no arguments
# returns just the science windows, which is exactly what we want.
for spw in inputs.ms.get_spectral_windows():
try:
# test that this spw should be flagged by assessing number of
# correlations, TDM/FDM mode etc.
self.verify_spw(spw)
except ValueError as e:
# This spw should not be or is incapable of being flagged by the
# fraction based TDM spw flagging heuristic in this method.
LOG.debug(str(e))
continue
LOG.debug('Spectral window {} is a TDM spectral window, proceeding with TDM spw edge flagging heuristics.'
''.format(spw.id))
# Get fraction of total number of spw channels that are to be
# flagged on each side of the spw.
# Note: this function is presently only implemented in subclasses
# FlagDeterALMA and FlagDeterALMASingleDish.
fracspw = self.get_fracspw(spw)
# Check whether the fraction is too high, such that flagging the
# corresponding number of channels on each side of the spw would
# result in a fully flagged spw. If so, then skip this spw, and log
# a debug message.
nchan_for_frac = int(utils.round_half_up(fracspw * spw.num_channels))
if 2 * nchan_for_frac >= spw.num_channels:
LOG.debug('Skipping fraction-based TDM edge channel flagging heuristic for spw %s, since the specified'
'fraction %s would have resulted in flagging all %s channels of the spw.'
'' % (spw.id, fracspw, spw.num_channels))
continue
# calculate the channel ranges to flag. No need to calculate the
# left minimum as it is always channel 0.
l_max = nchan_for_frac - 1
r_min = spw.num_channels - nchan_for_frac
r_max = spw.num_channels - 1
# state the spw and channels to flag in flagdata format, adding
# the statement to the list of flag commands
cmd = '{0}:0~{1};{2}~{3}'.format(spw.id, l_max, r_min, r_max)
to_flag.append(cmd)
return to_flag
def _add_file(self, filename):
"""
Read and return the contents of a file or list of files.
"""
# If the input is a list of flagging command file names, call this
# function recursively. Otherwise, read in the file and return its
# contents
if isinstance(filename, list):
return ''.join([self._add_file(f) for f in filename])
else:
with open(filename) as stream:
return stream.read().rstrip('\n')
def _read_flagfile(self, filename):
if not os.path.exists(filename):
LOG.warning('%s does not exist' % filename)
return []
# strip out comments and empty lines to leave the real commands.
# This is so we can compare the number of valid commands to the number
# of commands specified in the file and complain if they differ
return [cmd for cmd in flaghelper.readFile(filename)
if not cmd.strip().startswith('#')
and not all(c in string.whitespace for c in cmd)]