"""
The restore data module provides a class for reimporting, reflagging, and
recalibrating a subset of the ASDMs belonging to a member OUS, using pipeline
flagging and calibration data products.
The basic restore data module assumes that the ASDMs, flagging, and calibration
data products are on disk in the rawdata directory in the format produced by
the ExportData class.
This class assumes that the required data products have been
o downloaded from the archive along with the ASDMs (not yet possible)
o are sitting on disk in a form which is compatible with what is
produced by ExportData
To test these classes, register some data with the pipeline using ImportData,
then execute:
import pipeline
vis = [ '<ASDM name>' ]
# Create a pipeline context and register some data
context = pipeline.Pipeline().context
inputs = pipeline.tasks.RestoreData.Inputs(context, vis=vis)
task = pipeline.tasks.RestoreData(inputs)
results = task.execute(dry_run=False)
results.accept(context)
"""
import glob
import os
import re
import sys
import shutil
import tarfile
import tempfile
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from pipeline.infrastructure import utils
from .. import applycal
from .. import importdata
from ..common import manifest
LOG = infrastructure.get_logger(__name__)
[docs]class RestoreDataResults(basetask.Results):
def __init__(self, importdata_results=None, applycal_results=None, flagging_summaries=None,
casa_version_orig=None, pipeline_version_orig=None, orig_mpi_servers=0):
"""
Initialise the results objects.
"""
super(RestoreDataResults, self).__init__()
self.importdata_results = importdata_results
self.applycal_results = applycal_results
self.mses = []
self.flagging_summaries = flagging_summaries
self.casa_version_orig = casa_version_orig
self.pipeline_version_orig = pipeline_version_orig
self.orig_mpi_servers = orig_mpi_servers
[docs] def merge_with_context(self, context):
if self.importdata_results:
for result in self.importdata_results:
result.merge_with_context(context)
for ms in context.observing_run.measurement_sets:
self.mses.append(ms)
if self.applycal_results:
if isinstance(self.applycal_results, list):
for result in self.applycal_results:
result.merge_with_context(context)
else:
self.applycal_results.merge_with_context(context)
def __repr__(self):
return 'RestoreDataResults:\n\t{0}'.format(
'\n\t'.join([ms.name for ms in self.mses]))
[docs]@task_registry.set_equivalent_casa_task('h_restoredata')
class RestoreData(basetask.StandardTaskTemplate):
"""
RestoreData is the base class for restoring flagged and calibrated
data produced during a previous pipeline run and archived on disk.
- Imports the selected ASDMs from rawdata
- Imports the flagversions for the selected ASDMs from ../rawdata
- Imports the calibration data for the selected ASDMs from ../rawdata
- Restores the final set of pipeline flags
- Restores the final calibration state
- Applies the calibrations
"""
# link the accompanying inputs to this task
Inputs = RestoreDataInputs
# Override the default behavior for multi-vis tasks
# Does this interfere with multi-vis behavior of
# called tasks.
is_multi_vis_task = True
[docs] def prepare(self):
# Create a local alias for inputs, so we're not saying
# 'self.inputs' everywhere
inputs = self.inputs
# Force inputs.vis and inputs.session to be a list.
sessionlist = inputs.session
if isinstance(sessionlist, str):
sessionlist = [sessionlist, ]
tmpvislist = inputs.vis
if isinstance(tmpvislist, str):
tmpvislist = [tmpvislist, ]
vislist = []
for vis in tmpvislist:
if os.path.dirname(vis) == '':
vislist.append(os.path.join(inputs.rawdata_dir, vis))
else:
vislist.append(vis)
# Download ASDMs from the archive or products_dir to rawdata_dir.
# TBD: Currently assumed done somehow
# Copy the required calibration products from "someplace" on disk
# (default ../products) to ../rawdata. The pipeline manifest file
# if present is used to determine which files to copy. Otherwise
# a file naming scheme is used. The latter is deprecated as it
# requires the exportdata / restoredata tasks to be synchronized
# but it is maintained for testing purposes.
if inputs.copytoraw:
self._do_copy_manifest_toraw('*pipeline_manifest.xml')
pipemanifest = self._do_get_manifest('*pipeline_manifest.xml', '*cal*pipeline_manifest.xml')
self._do_copytoraw(pipemanifest)
else:
pipemanifest = self._do_get_manifest('*pipeline_manifest.xml', '*cal*pipeline_manifest.xml')
# Convert ASDMS assumed to be on disk in rawdata_dir. After this step
# has been completed the MS and MS.flagversions directories will exist
# and MS,flagversions will contain a copy of the original MS flags,
# Flags.Original.
# TBD: Add error handling
import_results = self._do_importasdm(sessionlist=sessionlist, vislist=vislist)
# Restore final MS.flagversions and flags
flag_version_name = 'Pipeline_Final'
self._do_restore_flags(pipemanifest, flag_version_name=flag_version_name)
# Get the session list and the visibility files associated with
# each session.
session_names, session_vislists = self._get_sessions()
# Restore calibration tables
self._do_restore_caltables(pipemanifest, session_names=session_names, session_vislists=session_vislists)
# Import calibration apply lists
self._do_restore_calstate(pipemanifest)
# Apply the calibrations.
apply_results = self._do_applycal()
# Get a summary of the flagging state.
flagging_summaries = self._get_flagging_summaries(session_names, session_vislists)
# Extract CASA version and pipeline version for previous run from
# pipeline manifest.
casa_version, pipeline_version, num_mpi = self._extract_casa_pipeline_version(pipemanifest)
# Return the results object, which will be used for the weblog
return RestoreDataResults(import_results, apply_results, flagging_summaries, casa_version, pipeline_version,
num_mpi)
[docs] def analyse(self, results):
return results
def _do_copy_manifest_toraw(self, template):
"""
Get the pipeline manifest
"""
inputs = self.inputs
# Download the pipeline manifest file from the archive or
# products_dir to rawdata_dir
manifest_files = glob.glob(os.path.join(inputs.products_dir, template))
for manifestfile in manifest_files:
LOG.info('Copying %s to %s' % (manifestfile, inputs.rawdata_dir))
shutil.copy(manifestfile, os.path.join(inputs.rawdata_dir,
os.path.basename(manifestfile)))
def _do_get_manifest(self, template1, template2):
"""
Get the pipeline manifest object
"""
inputs = self.inputs
# Get the list of files in the rawdata directory
# First find all the manifest files of any kind
# If there is more than one file in that list then try the more restrictive template
manifestfiles = glob.glob(os.path.join(inputs.rawdata_dir, template1))
if len(manifestfiles) > 1:
manifestfiles2 = glob.glob(os.path.join(inputs.rawdata_dir, template2))
if len(manifestfiles2) > 0 and len(manifestfiles2) < len(manifestfiles):
manifestfiles = manifestfiles2
# Parse manifest file if it exists.
if len(manifestfiles) > 0:
# Parse manifest file
# There should be only one of these so pick one
pipemanifest = manifest.PipelineManifest('')
pipemanifest.import_xml(manifestfiles[0])
else:
pipemanifest = None
return pipemanifest
def _do_copytoraw(self, pipemanifest):
inputs = self.inputs
ouss = pipemanifest.get_ous()
# Download flag versions
# Download from the archive or products_dir to rawdata_dir.
if pipemanifest is not None:
inflagfiles = [os.path.join(inputs.products_dir, flagfile)
for flagfile in pipemanifest.get_final_flagversions(ouss).values()]
else:
inflagfiles = glob.glob(os.path.join(inputs.products_dir, '*.flagversions.tgz'))
for flagfile in inflagfiles:
LOG.info('Copying %s to %s' % (flagfile, inputs.rawdata_dir))
shutil.copy(flagfile, os.path.join(inputs.rawdata_dir, os.path.basename(flagfile)))
# Download calibration tables
# Download calibration files from the archive or products_dir to
if pipemanifest is not None:
incaltables = [os.path.join(inputs.products_dir, caltable)
for caltable in pipemanifest.get_caltables(ouss).values()]
else:
incaltables = glob.glob(os.path.join(inputs.products_dir, '*.caltables.tgz'))
for caltable in incaltables:
LOG.info('Copying %s to %s' % (caltable, inputs.rawdata_dir))
shutil.copy(caltable, os.path.join(inputs.rawdata_dir, os.path.basename(caltable)))
# Download calibration apply lists
# Download from the archive or products_dir to rawdata_dir.
# TBD: Currently assumed done somehow
if pipemanifest is not None:
inapplycals = [os.path.join(inputs.products_dir, applycals)
for applycals in pipemanifest.get_applycals(ouss).values()]
else:
inapplycals = glob.glob(os.path.join(inputs.products_dir, '*.calapply.txt'))
for calapply_list in inapplycals:
LOG.info('Copying %s to %s' % (calapply_list, inputs.rawdata_dir))
shutil.copy(calapply_list, os.path.join(inputs.rawdata_dir, os.path.basename(calapply_list)))
def _do_importasdm(self, sessionlist, vislist):
inputs = self.inputs
# The asis is temporary until we get the EVLA / ALMA factoring
# figured out.
container = vdp.InputsContainer(importdata.ImportData, inputs.context, vis=vislist, session=sessionlist,
save_flagonline=False, lazy=inputs.lazy, bdfflags=inputs.bdfflags,
asis=inputs.asis, ocorr_mode=inputs.ocorr_mode, createmms='false')
importdata_task = importdata.ImportData(container)
return self._executor.execute(importdata_task, merge=True)
def _do_restore_flags(self, pipemanifest, flag_version_name='Pipeline_Final'):
inputs = self.inputs
flagversionlist = []
if pipemanifest is not None:
ouss = pipemanifest.get_ous()
else:
ouss = None
# Loop over MS list in working directory
for ms in inputs.context.observing_run.measurement_sets:
# Remove imported MS.flagversions from working directory
flagversion = ms.basename + '.flagversions'
flagversionpath = os.path.join(inputs.output_dir, flagversion)
if os.path.exists(flagversionpath):
LOG.info('Removing default flagversion for %s' % ms.basename)
if not self._executor._dry_run:
shutil.rmtree(flagversionpath)
# Untar MS.flagversions file in rawdata_dir to output_dir
if ouss is not None:
tarfilename = os.path.join(inputs.rawdata_dir,
pipemanifest.get_final_flagversions(ouss)[ms.basename])
else:
tarfilename = os.path.join(inputs.rawdata_dir,
ms.basename + '.flagversions.tgz')
LOG.info('Extracting %s' % flagversion)
LOG.info(' From %s' % tarfilename)
LOG.info(' Into %s' % inputs.output_dir)
with tarfile.open(tarfilename, 'r:gz') as tar:
if not self._executor._dry_run:
tar.extractall(path=inputs.output_dir)
# Restore final flags version using flagmanager
LOG.info('Restoring final flags for %s from flag version %s' % (ms.basename, flag_version_name))
if not self._executor._dry_run:
task = casa_tasks.flagmanager(vis=ms.name,
mode='restore',
versionname=flag_version_name)
try:
self._executor.execute(task)
except Exception:
LOG.error("Application of final flags failed for %s" % ms.basename)
raise
flagversionlist.append(flagversionpath)
return flagversionlist
def _do_restore_calstate(self, pipemanifest):
inputs = self.inputs
if pipemanifest is not None:
ouss = pipemanifest.get_ous()
else:
ouss = None
# Loop over MS list in working directory
append = False
for ms in inputs.context.observing_run.measurement_sets:
if ouss is not None:
applyfile_name = os.path.join(inputs.rawdata_dir,
pipemanifest.get_applycals(ouss)[ms.basename])
else:
applyfile_name = os.path.join(inputs.rawdata_dir, ms.basename + '.calapply.txt')
LOG.info('Restoring calibration state for %s from %s'
'' % (ms.basename, applyfile_name))
if not self._executor._dry_run:
# Write converted calstate to a temporary file and use this
# for the import. the temporary file will automatically be
# deleted once out of scope
with tempfile.NamedTemporaryFile() as tmpfile:
LOG.trace('Writing converted calstate to %s'
'' % tmpfile.name)
converted = self._convert_calstate_paths(applyfile_name)
tmpfile.write(converted.encode(sys.stdout.encoding))
tmpfile.flush()
inputs.context.callibrary.import_state(tmpfile.name,
append=append)
append = True
def _convert_calstate_paths(self, applyfile):
"""
Convert paths in the exported calstate to point to the new output
directory.
Returns the converted commands as a list of strings
"""
# regex to match unix paths
unix_path = re.compile('((?:\\/[\\w\\.\\-]+)+)',
re.IGNORECASE | re.DOTALL)
# define a function that replaces directory names with our new output
# directory
def repfn(matchobj):
basename = os.path.basename(matchobj.group(0))
return os.path.join(self.inputs.output_dir, basename)
# search-and-replace directory names in the exported calstate file
with open(applyfile, 'r') as f:
return unix_path.sub(repfn, f.read())
def _do_restore_caltables(self, pipemanifest, session_names=None, session_vislists=None):
inputs = self.inputs
if pipemanifest is not None:
ouss = pipemanifest.get_ous()
else:
ouss = None
# Determine the OUS uid
ps = inputs.context.project_structure
if ps is None or ps.ousstatus_entity_id == 'unknown':
ousid = ''
else:
ousid = ps.ousstatus_entity_id.translate(str.maketrans(':/', '__')) + '.'
# Loop over sessions
for index, session in enumerate(session_names):
# Get the visibility list for that session.
vislist = session_vislists[index]
# Open the tarfile and get the names
if ouss is not None:
try:
tarfilename = os.path.join(inputs.rawdata_dir,
pipemanifest.get_caltables(ouss)[session])
except Exception:
tarfilename = os.path.join(inputs.rawdata_dir,
pipemanifest.get_caltables(ouss)['default'])
elif ousid == '':
tarfilename = glob.glob(os.path.join(inputs.rawdata_dir,
'*' + session + '.caltables.tgz'))[0]
else:
tarfilename = os.path.join(inputs.rawdata_dir,
ousid + session + '.caltables.tgz')
with tarfile.open(tarfilename, 'r:gz') as tar:
tarmembers = tar.getmembers()
# Loop over the visibilities associated with that session
for vis in vislist:
LOG.info('Restoring caltables for %s from %s'
'' % (os.path.basename(vis), tarfilename))
extractlist = []
for member in tarmembers:
if member.name.startswith(os.path.basename(vis)):
extractlist.append(member)
# it is uncertain whether or not slash (/) exists at the end
if member.name.endswith('.tbl/') or member.name.endswith('.tbl'):
LOG.info(' Extracting caltable %s' % member.name)
if not self._executor._dry_run:
if len(extractlist) == len(tarmembers):
tar.extractall(path=inputs.output_dir)
else:
tar.extractall(path=inputs.output_dir, members=extractlist)
def _do_applycal(self):
container = vdp.InputsContainer(applycal.Applycal, self.inputs.context)
applycal_task = applycal.Applycal(container)
return self._executor.execute(applycal_task, merge=True)
def _get_sessions(self, sessions=None, vis=None):
"""
Return a list of sessions where each element of the list contains
the vis files associated with that session. If sessions is
undefined the context is searched for session information
"""
if sessions is None:
sessions = []
if vis is None:
vis = []
inputs = self.inputs
all_mses = inputs.context.observing_run.measurement_sets
# Get the MS list from the context by default.
if len(vis) == 0:
wkvis = [ms.name for ms in all_mses]
else:
wkvis = vis
# If the input session list is empty determine the sessions from
# the context.
if len(sessions) == 0:
wksessions = [ms.session for ms in all_mses]
else:
wksessions = sessions
# Determine the number of unique sessions.
session_seqno = 0
session_dict = {}
for i in range(len(wksessions)):
if wksessions[i] not in session_dict:
session_dict[wksessions[i]] = session_seqno
session_seqno = session_seqno + 1
# Initialize the output session names and visibility file lists
session_names = []
session_vis_list = []
for key, _ in sorted(session_dict.items(), key=lambda k_v: (k_v[1], k_v[0])):
session_names.append(key)
session_vis_list.append([])
# Assign the visibility files to the correct session
for j in range(len(wkvis)):
# Match the session names if possible
if j < len(wksessions):
for i in range(len(session_names)):
if wksessions[j] == session_names[i]:
session_vis_list[i].append(wkvis[j])
# Assign to the last session
else:
session_vis_list[len(session_names) - 1].append(wkvis[j])
# Log the sessions
for i in range(len(session_vis_list)):
LOG.info('Visibility list for session %s is %s' % (session_names[i], session_vis_list[i]))
return session_names, session_vis_list
def _get_flagging_summaries(self, session_names, session_vislists):
# Initialize summaries dictionary.
summaries = {}
# Extract summaries for each session.
for ind, session_name in enumerate(session_names):
summaries[session_name] = {}
# Extract summary for each vis.
for vis in session_vislists[ind]:
vis_name = os.path.basename(vis)
LOG.info("Creating flagging summary for session: {}, vis: {}".format(session_name, vis_name))
# Get CASA intent corresponding to 'TARGET'.
ms = self.inputs.context.observing_run.get_ms(name=vis)
casa_intent = utils.to_CASA_intent(ms, 'TARGET')
# Create and execute flagdata summary job, store result in dictionary.
job = casa_tasks.flagdata(vis=vis_name, mode='summary', fieldcnt=True, intent=casa_intent)
summarydict = self._executor.execute(job)
summaries[session_name][vis_name] = summarydict
return summaries
@staticmethod
def _extract_casa_pipeline_version(pipemanifest):
if pipemanifest is not None:
ouss = pipemanifest.get_ous()
else:
ouss = None
if ouss is not None:
casa_version = pipemanifest.get_casa_version(ouss)
pipeline_version = pipemanifest.get_pipeline_version(ouss)
total_mpi_servers = sum([int(node.get('num_mpi_servers')) for node in pipemanifest.get_execution_nodes(ouss)])
else:
casa_version = None
pipeline_version = None
total_mpi_servers = None
return casa_version, pipeline_version, total_mpi_servers