Source code for pipeline.hif.tasks.mstransform.mstransform
import operator
import os
import shutil
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.tablereader as tablereader
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
LOG = infrastructure.get_logger(__name__)
# Define the minimum set of parameters required to split out
# the TARGET data from the complete and fully calibrated
# original MS. Other parameters will be added here as more
# capabilities are added to hif_mstransform.
[docs]class MstransformInputs(vdp.StandardInputs):
@vdp.VisDependentProperty
def outputvis(self):
vis_root = os.path.splitext(self.vis)[0]
return vis_root + '_target.ms'
@outputvis.convert
def outputvis(self, value):
if isinstance(value, str):
return list(value.replace('[', '').replace(']', '').replace("'", "").split(','))
else:
return value
# By default find all the fields with TARGET intent
@vdp.VisDependentProperty
def field(self):
# Find fields in the current ms that have been observed
# with the desired intent
fields = self.ms.get_fields(intent=self.intent)
# When an observation is terminated the final scan can be a TARGET scan but may
# not contain any TARGET data for the science spectral windows, e.g. there is square
# law detector data but nothing else. The following code removes fields that do not contain
# data for the requested spectral windows - the science spectral windows by default.
if fields:
fields_by_id = sorted(fields, key=operator.attrgetter('id'))
last_field = fields_by_id[-1]
# While we're here, remove any fields that are not linked with a source. This should
# not occur since the CAS-9499 tablereader bug was fixed, but check anyway.
if getattr(last_field, 'source', None) is None:
fields.remove(last_field)
LOG.info('Truncated observation detected (no source for field): '
'removing Field {!s}'.format(last_field.id))
else:
# .. and then any fields that do not contain the
# requested spectral windows. This should prevent
# aborted scans from being split into the TARGET
# measurement set.
requested_spws = set(self.ms.get_spectral_windows(self.spw))
if last_field.valid_spws.isdisjoint(requested_spws):
LOG.info('Truncated observation detected (missing spws): '
'removing Field {!s}'.format(last_field.id))
fields.remove(last_field)
unique_field_names = {f.name for f in fields}
field_ids = {f.id for f in fields}
# Fields with different intents may have the same name. Check for this
# and return the ids rather than the names if necessary to resolve any
# ambiguities
if len(unique_field_names) == len(field_ids):
return ','.join(unique_field_names)
else:
return ','.join([str(i) for i in field_ids])
# Select TARGET data by default
intent = vdp.VisDependentProperty(default='TARGET')
# Find all the spws with TARGET intent. These may be a subset of the
# science spws which include calibration spws.
@vdp.VisDependentProperty
def spw(self):
science_target_intents = set(self.intent.split(','))
science_target_spws = []
science_spws = [spw for spw in self.ms.get_spectral_windows(science_windows_only=True)]
for spw in science_spws:
if spw.intents.intersection(science_target_intents):
science_target_spws.append(spw)
return ','.join([str(spw.id) for spw in science_target_spws])
@spw.convert
def spw(self, value):
science_target_intents = set(self.intent.split(','))
science_target_spws = []
science_spws = [spw for spw in self.ms.get_spectral_windows(task_arg=value, science_windows_only=True)]
for spw in science_spws:
if spw.intents.intersection(science_target_intents):
science_target_spws.append(spw)
return ','.join([str(spw.id) for spw in science_target_spws])
chanbin = vdp.VisDependentProperty(default=1)
timebin = vdp.VisDependentProperty(default='0s')
def __init__(self, context, output_dir=None, vis=None, outputvis=None, field=None, intent=None, spw=None,
chanbin=None, timebin=None):
super(MstransformInputs, self).__init__()
# set the properties to the values given as input arguments
self.context = context
self.vis = vis
self.output_dir = output_dir
self.outputvis = outputvis
self.field = field
self.intent = intent
self.spw = spw
self.chanbin = chanbin
self.timebin = timebin
[docs] def to_casa_args(self):
# Get parameter dictionary.
d = super(MstransformInputs, self).to_casa_args()
# Force the data column to be 'corrected' and the
# new (with casa 4.6) reindex parameter to be False
d['datacolumn'] = 'corrected'
d['reindex'] = False
if self.chanbin > 1:
d['chanaverage'] = True
else:
d['chanaverage'] = False
if self.timebin > '0s':
d['timeaverage'] = True
else:
d['timeaverage'] = False
return d
[docs]@task_registry.set_equivalent_casa_task('hif_mstransform')
class Mstransform(basetask.StandardTaskTemplate):
Inputs = MstransformInputs
[docs] def prepare(self):
inputs = self.inputs
# Create the results structure
result = MstransformResults(vis=inputs.vis, outputvis=inputs.outputvis)
# Run CASA task
mstransform_args = inputs.to_casa_args()
mstransform_job = casa_tasks.mstransform(**mstransform_args)
self._executor.execute(mstransform_job)
# Copy across requisite XML files.
self._copy_xml_files(inputs.vis, inputs.outputvis)
return result
[docs] def analyse(self, result):
# Check for existence of the output vis.
if not os.path.exists(result.outputvis):
LOG.debug('Error creating target MS %s' % (os.path.basename(result.outputvis)))
return result
# Import the new measurement set.
to_import = os.path.abspath(result.outputvis)
observing_run = tablereader.ObservingRunReader.get_observing_run(to_import)
# Adopt same session as source measurement set
for ms in observing_run.measurement_sets:
LOG.debug('Setting session to %s for %s', self.inputs.ms.session, ms.basename)
ms.session = self.inputs.ms.session
ms.is_imaging_ms = True
result.mses.extend(observing_run.measurement_sets)
return result
@staticmethod
def _copy_xml_files(vis, outputvis):
for xml_filename in ['SpectralWindow.xml', 'DataDescription.xml']:
vis_source = os.path.join(vis, xml_filename)
outputvis_target = os.path.join(outputvis, xml_filename)
if os.path.exists(vis_source) and os.path.exists(outputvis):
LOG.info('Copying %s from original MS to target MS', xml_filename)
LOG.trace('Copying %s: %s to %s', xml_filename, vis_source, outputvis_target)
shutil.copyfile(vis_source, outputvis_target)
[docs]class MstransformResults(basetask.Results):
def __init__(self, vis, outputvis):
super(MstransformResults, self).__init__()
self.vis = vis
self.outputvis = outputvis
self.mses = []
[docs] def merge_with_context(self, context):
# Check for an output vis
if not self.mses:
LOG.error('No hif_mstransform results to merge')
return
target = context.observing_run
# Adding mses to context
for ms in self.mses:
LOG.info('Adding {} to context'.format(ms.name))
target.add_measurement_set(ms)
# Create targets flagging template file if it does not already exist
for ms in self.mses:
if not ms.is_imaging_ms:
continue
template_flagsfile = os.path.join(
self.inputs['output_dir'], os.path.splitext(os.path.basename(self.vis))[0] + '.flagtargetstemplate.txt')
self._make_template_flagfile(template_flagsfile, 'User flagging commands file for the imaging pipeline')
# Initialize callibrary
for ms in self.mses:
calto = callibrary.CalTo(vis=ms.name)
LOG.info('Registering {} with callibrary'.format(ms.name))
context.callibrary.add(calto, [])
def _make_template_flagfile(self, outfile, titlestr):
# Create a new file if overwrite is true and the file
# does not already exist.
if not os.path.exists(outfile):
template_text = FLAGGING_TEMPLATE_HEADER.replace('___TITLESTR___', titlestr)
with open(outfile, 'w') as f:
f.writelines(template_text)
def __str__(self):
# Format the Mstransform results.
s = 'MstransformResults:\n'
s += '\tOriginal MS {vis} transformed to {outputvis}\n'.format(
vis=os.path.basename(self.vis),
outputvis=os.path.basename(self.outputvis))
return s
def __repr__(self):
return 'MstranformResults({}, {})'.format(os.path.basename(self.vis), os.path.basename(self.outputvis))
FLAGGING_TEMPLATE_HEADER = '''#
# ___TITLESTR___
#
# Examples
# Note: Do not put spaces inside the reason string !
#
# mode='manual' correlation='YY' antenna='DV01;DV08;DA43;DA48&DV23' spw='21:1920~2880' autocorr=False reason='bad_channels'
# mode='manual' spw='25:0~3;122~127' reason='stage8_2'
# mode='manual' antenna='DV07' timerange='2013/01/31/08:09:55.248~2013/01/31/08:10:01.296' reason='quack'
#
'''