Source code for pipeline.hif.tasks.transformimagedata.transformimagedata
import os
import shutil
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.tablereader as tablereader
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.mstransform import mssplit
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
LOG = infrastructure.get_logger(__name__)
[docs]class TransformimagedataResults(basetask.Results):
def __init__(self, vis, outputvis):
super(TransformimagedataResults, self).__init__()
self.vis = vis
self.outputvis = outputvis
self.ms = None
[docs] def merge_with_context(self, context):
# Check for an output vis
if not self.ms:
LOG.error('No hif_transformimagedata results to merge')
return
target = context.observing_run
parentms = None
#if self.vis == self.outputvis:
# The parent MS has been removed.
if not os.path.exists(self.vis):
for index, ms in enumerate(target.get_measurement_sets()):
#if ms.name == self.outputvis:
if ms.name == self.vis:
parentms = index
break
if self.ms:
if parentms is not None:
LOG.info('Replace {} in context'.format(self.ms.name))
del target.measurement_sets[parentms]
target.add_measurement_set(self.ms)
else:
LOG.info('Adding {} to context'.format(self.ms.name))
target.add_measurement_set(self.ms)
# Remove original measurement set from context
context.observing_run.measurement_sets.pop(0)
for i in range(0, len(context.clean_list_pending)):
outvisname = context.output_dir + '/' + os.path.basename(self.outputvis)
context.clean_list_pending[i]['heuristics'].observing_run.measurement_sets[0].name = outvisname
newvislist = [self.outputvis]
context.clean_list_pending[i]['heuristics'].vislist = newvislist
def __str__(self):
# Format the MsSplit results.
s = 'Transformimagedata:\n'
s += '\tOriginal MS {vis} transformed to {outputvis}\n'.format(
vis=os.path.basename(self.vis),
outputvis=os.path.basename(self.outputvis))
return s
def __repr__(self):
return 'Transformimagedata({}, {})'.format(os.path.basename(self.vis), os.path.basename(self.outputvis))
[docs]class TransformimagedataInputs(mssplit.MsSplitInputs):
clear_pointing = vdp.VisDependentProperty(default=True)
modify_weights = vdp.VisDependentProperty(default=False)
wtmode = vdp.VisDependentProperty(default='')
replace = vdp.VisDependentProperty(default=False)
datacolumn = vdp.VisDependentProperty(default='corrected')
@vdp.VisDependentProperty
def outputvis(self):
output_dir = self.context.output_dir
if isinstance(self._outputvis, vdp.NullMarker):
# Need this to be in the working directory
# vis_root = os.path.splitext(self.vis)[0]
vis_root = os.path.splitext(os.path.basename(self.vis))[0]
return output_dir + '/' + vis_root + '_split.ms'
else:
return output_dir + '/' + os.path.basename(self.outputvis)
@outputvis.convert
def outputvis(self, value=''):
return value
def __init__(self, context, vis=None, output_dir=None,
outputvis=None, field=None, intent=None, spw=None,
datacolumn=None, chanbin=None, timebin=None, replace=None,
clear_pointing=None, modify_weights=None, wtmode=None):
# super(TransformimagedataInputs, self).__init__()
# set the properties to the values given as input arguments
self.context = context
self.vis = vis
self.output_dir = output_dir
self.outputvis = outputvis
self.field = field
self.intent = intent
self.spw = spw
self.datacolumn = datacolumn
self.chanbin = chanbin
self.timebin = timebin
self.replace = replace
if clear_pointing is not False:
clear_pointing = True
self.clear_pointing = clear_pointing
if modify_weights is not True:
modify_weights = False
self.modify_weights = modify_weights
self.wtmode = wtmode
[docs]@task_registry.set_equivalent_casa_task('hif_transformimagedata')
class Transformimagedata(mssplit.MsSplit):
Inputs = TransformimagedataInputs
[docs] def prepare(self):
inputs = self.inputs
# Test whether or not a split has been requested
"""
if inputs.field == '' and inputs.spw == '' and inputs.intent == '' and \
inputs.chanbin == 1 and inputs.timebin == '0s':
result = TransformimagedataResults(vis=inputs.vis, outputvis=inputs.outputvis)
LOG.warning('Output MS equals input MS %s' % (os.path.basename(inputs.vis)))
return
"""
# Split is required so create the results structure
result = TransformimagedataResults(vis=inputs.vis, outputvis=inputs.outputvis)
# Run CASA task
# Does this need a try / except block
visfields = []
visspws = []
for imageparam in inputs.context.clean_list_pending:
visfields.extend(imageparam['field'].split(','))
visspws.extend(imageparam['spw'].split(','))
visfields = set(visfields)
visfields = list(visfields)
visfields = ','.join(visfields)
visspws = set(visspws)
visspws = sorted(visspws)
visspws = ','.join(visspws)
mstransform_args = inputs.to_casa_args()
mstransform_args['field'] = visfields
mstransform_args['reindex'] = False
mstransform_args['spw'] = visspws
for dictkey in ('clear_pointing', 'modify_weights', 'wtmode'):
try:
del mstransform_args[dictkey]
except KeyError:
pass
mstransform_job = casa_tasks.mstransform(**mstransform_args)
self._executor.execute(mstransform_job)
return result
[docs] def analyse(self, result):
# Check for existence of the output vis.
if not os.path.exists(result.outputvis):
return result
inputs = self.inputs
# There seems to be a rerendering issue with replace. For now just
# remove the old file.
if inputs.replace:
shutil.rmtree(result.vis)
#shutil.move (result.outputvis, result.vis)
#result.outputvis = result.vis
# Import the new MS
to_import = os.path.abspath(result.outputvis)
observing_run = tablereader.ObservingRunReader.get_observing_run(to_import)
# Adopt same session as source measurement set
for ms in observing_run.measurement_sets:
LOG.debug('Setting session to %s for %s', self.inputs.ms.session, ms.basename)
ms.session = self.inputs.ms.session
ms.is_imaging_ms = True
# Note there will be only 1 MS in the temporary observing run structure
result.ms = observing_run.measurement_sets[0]
if inputs.clear_pointing:
LOG.info('Removing POINTING table from ' + ms.name)
with casa_tools.TableReader(ms.name + '/POINTING', nomodify=False) as table:
rows = table.rownumbers()
table.removerows(rows)
if inputs.modify_weights:
LOG.info('Re-initializing the weights in ' + ms.name)
if inputs.wtmode:
task = casa_tasks.initweights(ms.name, wtmode=inputs.wtmode)
else:
task = casa_tasks.initweights(ms.name)
self._executor.execute(task)
return result