import os
import string
from casatasks.private import flaghelper
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.api as api
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
# the logger for this module
LOG = infrastructure.get_logger(__name__)
__all__ = [
'FlagTargetsALMA',
'FlagTargetsALMAInputs',
'FlagTargetsALMAResults'
]
"""
Flag ALMA target science target data.
"""
[docs]class FlagTargetsALMAInputs(vdp.StandardInputs):
"""
FlagTargetsALMA Inputs manages the inputs for the FlagTargetALMA task.
.. py:attribute:: context
the (:class:`~pipeline.infrastructure.launcher.Context`) holding all
pipeline state
.. py:attribute:: vis
a string or list of strings containing the MS name(s) on which to
operate
.. py:attribute:: output_dir
the directory to which pipeline data should be sent
.. py:attribute:: flagbackup
a boolean indicating whether whether existing flags should be backed
up before new flagging begins.
.. py:attribute:: template
A boolean indicating whether flagging templates are to be applied.
.. py:attribute:: filetemplate
The filename of the ASCII file that contains the flagging template
"""
flagbackup = vdp.VisDependentProperty (default=False)
template = vdp.VisDependentProperty(default=True)
@vdp.VisDependentProperty
def filetemplate(self):
vis_root = os.path.splitext(self.vis)[0].split('_target')[0]
return vis_root + '.flagtargetstemplate.txt'
@filetemplate.convert
def filetemplate(self, value):
if isinstance(value, str):
return list(value.replace('[', '').replace(']', '').replace("'", "").split(','))
else:
return value
@vdp.VisDependentProperty
def inpfile(self):
vis_root = os.path.splitext(self.vis)[0].split('_target')[0]
return os.path.join(self.output_dir, vis_root + '.flagtargetscmds.txt')
def __init__(self, context, vis=None, output_dir=None, flagbackup=None, template=None, filetemplate=None):
super(FlagTargetsALMAInputs, self).__init__()
# pipeline inputs
self.context = context
# vis must be set first, as other properties may depend on it
self.vis = vis
self.output_dir = output_dir
self.flagbackup = flagbackup
self.template = template
self.filetemplate = filetemplate
[docs] def to_casa_args(self):
"""
Translate the input parameters of this class to task parameters
required by the CASA task flagdata. The returned object is a
dictionary of flagdata arguments as keyword/value pairs.
:rtype: dict
"""
return {'vis': self.vis,
'mode': 'list',
'action': 'apply',
'inpfile': self.inpfile,
'flagbackup': self.flagbackup}
# tell the infrastructure to prefentially apply the targets
# flags to the split MS(s)
api.ImagingMeasurementSetsPreferred.register(FlagTargetsALMAInputs)
[docs]class FlagTargetsALMAResults(basetask.Results):
def __init__(self, summaries, flagcmds):
super(FlagTargetsALMAResults, self).__init__()
self.summaries = summaries
self._flagcmds = flagcmds
[docs] def flagcmds(self):
return self._flagcmds
[docs] def merge_with_context(self, context):
# nothing to do
pass
def __repr__(self):
# Step through the summary list and print a few things.
# SUBTRACT flag counts from previous agents, because the counts are
# cumulative.
s = 'Target flagging results:\n'
for idx in range(0, len(self.summaries)):
flagcount = int(self.summaries[idx]['flagged'])
totalcount = int(self.summaries[idx]['total'])
# From the second summary onwards, subtract counts from the previous
# one
if idx > 0:
flagcount = flagcount - int(self.summaries[idx-1]['flagged'])
s += '\tSummary %s (%s) : Flagged : %s out of %s (%0.2f%%)\n' % (
idx, self.summaries[idx]['name'], flagcount, totalcount,
100.0*flagcount/totalcount)
return s
[docs]@task_registry.set_equivalent_casa_task('hifa_flagtargets')
class FlagTargetsALMA(basetask.StandardTaskTemplate):
"""
FlagTargetsALMA is a class for target flagging. It can perform
- Template flags
"""
# link the accompanying inputs to this task
Inputs = FlagTargetsALMAInputs
[docs] def prepare(self):
"""
Prepare and execute a flagdata flagging job appropriate to the
task inputs.
"""
# create a local alias for inputs, so we're not saying 'self.inputs'
# everywhere
inputs = self.inputs
# get the flagdata command string required for the results
flag_cmds = self._get_flag_commands()
flag_str = '\n'.join(flag_cmds)
# write the flag commands to the file
with open(inputs.inpfile, 'w') as stream:
stream.writelines(flag_str)
# to save inspecting the file, also log the flag commands
LOG.debug('Flag commands for %s:\n%s', inputs.vis, flag_str)
# Map the pipeline inputs to a dictionary of CASA task arguments
task_args = inputs.to_casa_args()
# create and execute a flagdata job using these task arguments
job = casa_tasks.flagdata(**task_args)
summary_dict = self._executor.execute(job)
agent_summaries = dict((v['name'], v) for v in summary_dict.values())
ordered_agents = ['before', 'template']
summary_reps = [agent_summaries[agent]
for agent in ordered_agents
if agent in agent_summaries]
# return the results object, which will be used for the weblog
return FlagTargetsALMAResults(summary_reps, flag_cmds)
[docs] def analyse(self, results):
"""
Analyse the results of the flagging operation.
This method does not perform any analysis, so the results object is
returned exactly as-is, with no data massaging or results items
added. If additional statistics needed to be calculated based on the
post-flagging state, this would be a good place to do it.
"""
return results
def _get_flag_commands(self):
"""
Get the flagging commands as a string suitable for flagdata.
"""
# create a local variable for the inputs associated with this instance
inputs = self.inputs
# create list which will hold the flagging commands
flag_cmds = ["mode='summary' name='before'"]
# flag template?
if inputs.template:
if not os.path.exists(inputs.filetemplate):
LOG.warning('Template flag file \'%s\' for \'%s\' not found.'
% (inputs.filetemplate, inputs.ms.basename))
else:
flag_cmds.extend(self._read_flagfile(inputs.filetemplate))
flag_cmds.append("mode='summary' name='template'")
return flag_cmds
@staticmethod
def _read_flagfile(filename):
if not os.path.exists(filename):
LOG.warning('%s does not exist' % filename)
return []
# strip out comments and empty lines to leave the real commands.
# This is so we can compare the number of valid commands to the number
# of commands specified in the file and complain if they differ
return [cmd for cmd in flaghelper.readFile(filename)
if not cmd.strip().startswith('#')
and not all(c in string.whitespace for c in cmd)]