"""
The exportdata module provides base classes for preparing data products
on disk for upload to the archive.
To test these classes, register some data with the pipeline using ImportData,
then execute:
import pipeline
vis = [ '<MS name>' ]
# Create a pipeline context and register some data
context = pipeline.Pipeline().context
inputs = pipeline.tasks.ImportData.Inputs(context, vis=vis)
task = pipeline.tasks.ImportData(inputs)
results = task.execute(dry_run=False)
results.accept(context)
# Run some other pipeline tasks, e.g flagging, calibration,
# and imaging in a similar manner
# Execute the export data task. The details of
# what gets exported depends on what tasks were run
# previously but may include the following
# TBD
inputs = pipeline.tasks.exportdata.Exportdata.Inputs(context,
vis, output_dir, sessions, pprfile, products_dir)
task = pipeline.tasks.exportdata.ExportData(inputs)
results = task.execute(dry_run = True)
"""
import collections
import copy
import errno
import fnmatch
import glob
import io
import os
import shutil
import sys
import tarfile
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.imagelibrary as imagelibrary
import pipeline.infrastructure.vdp as vdp
from pipeline import environment
from pipeline.infrastructure.filenamer import fitsname
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from ..common import manifest
# the logger for this module
LOG = infrastructure.get_logger(__name__)
StdFileProducts = collections.namedtuple('StdFileProducts', 'ppr_file weblog_file casa_commands_file casa_pipescript casa_restore_script')
# product name utility
[docs]class PipelineProductNameBuiler(object):
@classmethod
def __build(self, *args, **kwargs):
if 'separator' in kwargs:
separator = kwargs['separator']
else:
separator = '.'
return separator.join(map(str, args))
@classmethod
def _join_dir(self, name, output_dir=None):
if output_dir is not None:
name = os.path.join(output_dir, name)
return name
@classmethod
def _build_from_oussid(self, basename, ousstatus_entity_id=None, output_dir=None):
if ousstatus_entity_id is None:
name = basename
else:
name = self.__build(ousstatus_entity_id, basename)
return self._join_dir(name, output_dir)
@classmethod
def _build_from_ps_oussid(self, basename, project_structure=None, ousstatus_entity_id=None, output_dir=None):
if project_structure is None:
name = basename
elif project_structure.ousstatus_entity_id == 'unknown':
name = basename
else:
name = self._build_from_oussid(basename, ousstatus_entity_id=ousstatus_entity_id)
return self._join_dir(name, output_dir)
@classmethod
def _build_from_oussid_session(self, basename, ousstatus_entity_id=None, session_name=None, output_dir=None):
name = self.__build(ousstatus_entity_id, session_name, basename)
return self._join_dir(name, output_dir)
@classmethod
def _build_calproduct_name(self, basename, aux_product=False, output_dir=None):
if aux_product:
prefix='auxcal'
else:
prefix='cal'
name = self.__build(prefix, basename, separator='')
return self._join_dir(name, output_dir)
@classmethod
def _build_from_vis(self, basename, vis, output_dir=None):
name = self.__build(os.path.basename(vis), basename)
return self._join_dir(name, output_dir)
[docs] @classmethod
def weblog(self, project_structure=None, ousstatus_entity_id=None, output_dir=None):
return self._build_from_ps_oussid('weblog.tgz',
project_structure=project_structure,
ousstatus_entity_id=ousstatus_entity_id,
output_dir=output_dir)
[docs] @classmethod
def casa_script(self, basename, project_structure=None, ousstatus_entity_id=None, output_dir=None):
return self._build_from_ps_oussid(basename,
project_structure=project_structure,
ousstatus_entity_id=ousstatus_entity_id,
output_dir=output_dir)
[docs] @classmethod
def manifest(self, basename, ousstatus_entity_id, output_dir=None):
return self._build_from_oussid(basename,
ousstatus_entity_id=ousstatus_entity_id,
output_dir=output_dir)
[docs] @classmethod
def calapply_list(self, vis, aux_product=False, output_dir=None):
basename = self._build_calproduct_name('apply.txt', aux_product=aux_product)
return self._build_from_vis(basename, vis, output_dir=output_dir)
[docs] @classmethod
def caltables(self, ousstatus_entity_id=None, session_name=None, aux_product=False, output_dir=None):
basename = self._build_calproduct_name('tables.tgz', aux_product=aux_product)
return self._build_from_oussid_session(basename=basename,
ousstatus_entity_id=ousstatus_entity_id,
session_name=session_name,
output_dir=None)
[docs] @classmethod
def aqua_report(self, basename, project_structure=None, ousstatus_entity_id=None, output_dir=None):
return self._build_from_ps_oussid(basename,
project_structure=project_structure,
ousstatus_entity_id=ousstatus_entity_id,
output_dir=output_dir)
[docs] @classmethod
def auxiliary_products(self, basename, ousstatus_entity_id=None, output_dir=None):
return self._build_from_oussid(basename,
ousstatus_entity_id=ousstatus_entity_id,
output_dir=output_dir)
[docs]class ExportDataResults(basetask.Results):
def __init__(self, pprequest='', sessiondict=None, msvisdict=None, calvisdict=None, calimages=None, targetimages=None, weblog='',
pipescript='', restorescript='', commandslog='', manifest=''):
"""
Initialise the results object with the given list of JobRequests.
"""
super(ExportDataResults, self).__init__()
if sessiondict is None:
sessiondict = collections.OrderedDict()
if msvisdict is None:
msvisdict = collections.OrderedDict()
if calvisdict is None:
calvisdict = collections.OrderedDict()
if calimages is None:
calimages = []
if targetimages is None:
targetimages = []
self.pprequest = pprequest
self.sessiondict = sessiondict
self.msvisdict = msvisdict
self.calvisdict = msvisdict
self.calimages = calimages
self.targetimages = targetimages
self.weblog = weblog
self.pipescript = pipescript
self.restorescript = restorescript
self.commandslog = commandslog
self.manifest = manifest
def __repr__(self):
s = 'ExportData results:\n'
return s
[docs]@task_registry.set_equivalent_casa_task('h_exportdata')
@task_registry.set_casa_commands_comment('The output data products are computed.')
class ExportData(basetask.StandardTaskTemplate):
"""
ExportData is the base class for exporting data to the products
subdirectory. It performs the following operations:
- Saves the pipeline processing request in an XML file
- Saves the final flags per ASDM in a compressed / tarred CASA flag
versions file
- Saves the final calibration apply list per ASDM in a text file
- Saves the final set of caltables per session in a compressed /
tarred file containing CASA tables
- Saves the final web log in a compressed / tarred file
- Saves the final CASA command log in a text file
- Saves the final pipeline script in a Python file
- Saves the final pipeline restore script in a Python file
- Saves the images in FITS cubes one per target and spectral window
"""
# link the accompanying inputs to this task
Inputs = ExportDataInputs
# Override the default behavior for multi-vis tasks
is_multi_vis_task = True
# name builder
NameBuilder = PipelineProductNameBuiler
[docs] def prepare(self):
"""
Prepare and execute an export data job appropriate to the
task inputs.
"""
# Create a local alias for inputs, so we're not saying
# 'self.inputs' everywhere
inputs = self.inputs
try:
LOG.trace('Creating products directory: {!s}'.format(inputs.products_dir))
os.makedirs(inputs.products_dir)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
# Initialize the standard ous is string.
oussid = self.get_oussid(inputs.context)
# Define the results object
result = ExportDataResults()
# Make the standard vislist and the sessions lists.
# These lists are constructed for the calibration mses only no matter the value of
# inputs.imaging_products_only
session_list, session_names, session_vislists, vislist = self._make_lists(inputs.context, inputs.session,
inputs.vis, imaging_only_mses=False)
# Export the standard per OUS file products
# The pipeline processing request
# A compressed tarfile of the weblog
# The pipeline processing script
# The pipeline restore script (if exportporting calibartion products)
# The CASA commands log
recipe_name = self.get_recipename(inputs.context)
if not recipe_name:
prefix = oussid
else:
prefix = oussid + '.' + recipe_name
stdfproducts = self._do_standard_ous_products(
inputs.context, inputs.exportcalprods, prefix, inputs.pprfile, session_list, vislist, inputs.output_dir,
inputs.products_dir)
if stdfproducts.ppr_file:
result.pprequest = os.path.basename(stdfproducts.ppr_file)
result.weblog = os.path.basename(stdfproducts.weblog_file)
result.pipescript = os.path.basename(stdfproducts.casa_pipescript)
if not inputs.exportcalprods:
result.restorescript = 'Undefined'
else:
result.restorescript = os.path.basename(stdfproducts.casa_restore_script)
result.commandslog = os.path.basename(stdfproducts.casa_commands_file)
# Make the standard ms dictionary and export per ms products
# Currently these are compressed tar files of per MS flagging tables and per MS text files of calibration
# apply instructions
msvisdict = collections.OrderedDict()
calvisdict = collections.OrderedDict()
if not inputs.imaging_products_only:
if inputs.exportmses:
msvisdict = self._do_ms_products(inputs.context, vislist, inputs.products_dir)
if inputs.exportcalprods:
calvisdict = self._do_standard_ms_products(inputs.context, vislist, inputs.products_dir)
result.msvisdict = msvisdict
result.calvisdict = calvisdict
# Make the standard sessions dictionary and export per session products
# Currently these are compressed tar files of per session calibration tables
sessiondict = collections.OrderedDict()
if not inputs.imaging_products_only:
if inputs.exportcalprods:
sessiondict = self._do_standard_session_products(inputs.context, oussid, session_names, session_vislists,
inputs.products_dir)
elif inputs.exportmses:
# still needs sessiondict
for i in range(len(session_names)):
sessiondict[session_names[i]] = \
([os.path.basename(visfile) for visfile in session_vislists[i]], )
result.sessiondict = sessiondict
# Export calibrator images to FITS
calimages_list, calimages_fitslist = self._export_images(inputs.context, True, inputs.calintents,
inputs.calimages, inputs.products_dir)
result.calimages=(calimages_list, calimages_fitslist)
# Export science target images to FITS
targetimages_list, targetimages_fitslist = self._export_images(inputs.context, False, 'TARGET',
inputs.targetimages, inputs.products_dir)
result.targetimages=(targetimages_list, targetimages_fitslist)
# Export the pipeline manifest file
#
pipemanifest = self._make_pipe_manifest(inputs.context, oussid, stdfproducts, sessiondict, msvisdict,
inputs.exportmses, calvisdict, inputs.exportcalprods,
[os.path.basename(image) for image in calimages_fitslist],
[os.path.basename(image) for image in targetimages_fitslist])
casa_pipe_manifest = self._export_pipe_manifest(prefix, 'pipeline_manifest.xml', inputs.products_dir,
pipemanifest)
result.manifest = os.path.basename(casa_pipe_manifest)
# Return the results object, which will be used for the weblog
return result
[docs] def analyse(self, results):
"""
Analyse the results of the export data operation.
This method does not perform any analysis, so the results object is
returned exactly as-is, with no data massaging or results items
added.
:rtype: :class:~`ExportDataResults`
"""
return results
[docs] def get_oussid(self, context):
"""
Determine the ous prefix
"""
# Get the parent ous ousstatus name. This is the sanitized ous
# status uid
ps = context.project_structure
if ps is None or ps.ousstatus_entity_id == 'unknown':
oussid = 'unknown'
else:
oussid = ps.ousstatus_entity_id.translate(str.maketrans(':/', '__'))
return oussid
[docs] def get_recipename(self, context):
"""
Get the recipe name
"""
# Get the parent ous ousstatus name. This is the sanitized ous
# status uid
ps = context.project_structure
if ps is None or ps.recipe_name == 'Undefined':
recipe_name = ''
else:
recipe_name = ps.recipe_name
return recipe_name
def _make_lists(self, context, session, vis, imaging_only_mses=False):
"""
Create the vis and sessions lists
"""
# Force inputs.vis to be a list.
vislist = vis
if isinstance(vislist, str):
vislist = [vislist]
if imaging_only_mses:
vislist = [vis for vis in vislist if context.observing_run.get_ms(name=vis).is_imaging_ms]
else:
vislist = [vis for vis in vislist if not context.observing_run.get_ms(name=vis).is_imaging_ms]
# Get the session list and the visibility files associated with
# each session.
session_list, session_names, session_vislists= self._get_sessions( \
context, session, vislist)
return session_list, session_names, session_vislists, vislist
def _do_standard_ous_products(self, context, exportcalprods, oussid, pprfile, session_list, vislist, output_dir,
products_dir):
"""
Generate the per ous standard products
"""
# Locate and copy the pipeline processing request.
# There should normally be at most one pipeline processing request.
# In interactive mode there is no PPR.
ppr_files = self._export_pprfile(context, output_dir, products_dir, oussid, pprfile)
if ppr_files:
ppr_file = os.path.basename(ppr_files[0])
else:
ppr_file = None
# Export a tar file of the web log
weblog_file = self._export_weblog(context, products_dir, oussid)
# Export the processing log independently of the web log
casa_commands_file = self._export_casa_commands_log(context, context.logs['casa_commands'], products_dir,
oussid)
# Export the processing script independently of the web log
casa_pipescript = self._export_casa_script(context, context.logs['pipeline_script'], products_dir, oussid)
# Export the restore script independently of the web log
if not exportcalprods:
casa_restore_script = 'Undefined'
else:
casa_restore_script = self._export_casa_restore_script(context, context.logs['pipeline_restore_script'],
products_dir, oussid, vislist, session_list)
return StdFileProducts(ppr_file, weblog_file, casa_commands_file, casa_pipescript, casa_restore_script)
def _do_ms_products(self, context, vislist, products_dir):
"""
Tar up the final calibrated mses and put them in the products
directory.
Used for reprocessing applications
"""
# Loop over the measurements sets in the working directory and tar
# them up.
mslist = []
for visfile in vislist:
ms_file = self._export_final_ms( context, visfile, products_dir)
mslist.append(ms_file)
# Create the ordered vis dictionary
# The keys are the base vis names
# The values are the ms files
visdict = collections.OrderedDict()
for i in range(len(vislist)):
visdict[os.path.basename(vislist[i])] = \
os.path.basename(mslist[i])
return visdict
def _do_standard_ms_products(self, context, vislist, products_dir):
"""
Generate the per ms standard products
"""
# Loop over the measurements sets in the working directory and
# save the final flags using the flag manager.
flag_version_name = 'Pipeline_Final'
for visfile in vislist:
self._save_final_flagversion(visfile, flag_version_name)
# Copy the final flag versions to the data products directory
# and tar them up.
flag_version_list = []
for visfile in vislist:
flag_version_file = self._export_final_flagversion( \
context, visfile, flag_version_name, \
products_dir)
flag_version_list.append(flag_version_file)
# Loop over the measurements sets in the working directory, and
# create the calibration apply file(s) in the products directory.
apply_file_list = []
for visfile in vislist:
apply_file = self._export_final_applylist(context, \
visfile, products_dir)
apply_file_list.append(apply_file)
# Create the ordered vis dictionary
# The keys are the base vis names
# The values are a tuple containing the flags and applycal files
visdict = collections.OrderedDict()
for i in range(len(vislist)):
visdict[os.path.basename(vislist[i])] = \
(os.path.basename(flag_version_list[i]), \
os.path.basename(apply_file_list[i]))
return visdict
def _do_standard_session_products(self, context, oussid, session_names, session_vislists, products_dir,
imaging=False):
"""
Generate the per ms standard products
"""
# Export tar files of the calibration tables one per session
caltable_file_list = []
for i in range(len(session_names)):
caltable_file = self._export_final_calfiles(context, oussid,
session_names[i], session_vislists[i], products_dir, imaging=imaging)
caltable_file_list.append(caltable_file)
# Create the ordered session dictionary
# The keys are the session names
# The values are a tuple containing the vislist and the caltables
sessiondict = collections.OrderedDict()
for i in range(len(session_names)):
sessiondict[session_names[i]] = \
([os.path.basename(visfile) for visfile in session_vislists[i]], \
os.path.basename(caltable_file_list[i]))
return sessiondict
def _do_if_auxiliary_products(self, oussid, output_dir, products_dir, vislist, imaging_products_only):
"""
Generate the auxiliary products
"""
if imaging_products_only:
contfile_name = 'cont.dat'
fluxfile_name = 'Undefined'
antposfile_name = 'Undefined'
else:
fluxfile_name = 'flux.csv'
antposfile_name = 'antennapos.csv'
contfile_name = 'cont.dat'
empty = True
# Get the flux, antenna position, and continuum subtraction
# files and test to see if at least one of them exists
flux_file = os.path.join(output_dir, fluxfile_name)
antpos_file = os.path.join(output_dir, antposfile_name)
cont_file = os.path.join(output_dir, contfile_name)
if os.path.exists(flux_file) or os.path.exists(antpos_file) or os.path.exists(cont_file):
empty = False
# Export the general and target source template flagging files
# The general template flagging files are not required for the restore but are
# informative to the user.
# Whether or not the target template files should be exported to the archive depends
# on the final place of the target flagging step in the work flow and
# how flags will or will not be stored back into the ASDM.
targetflags_filelist = []
if self.inputs.imaging_products_only:
flags_file_list = glob.glob('*.flagtargetstemplate.txt')
elif not vislist:
flags_file_list = glob.glob('*.flagtemplate.txt')
flags_file_list.extend(glob.glob('*.flagtsystemplate.txt'))
else:
flags_file_list = glob.glob('*.flag*template.txt')
for file_name in flags_file_list:
flags_file = os.path.join(output_dir, file_name)
if os.path.exists(flags_file):
empty = False
targetflags_filelist.append(flags_file)
else:
targetflags_filelist.append('Undefined')
if empty:
return None
# Create the tarfile
cwd = os.getcwd()
try:
os.chdir(output_dir)
# Define the name of the output tarfile
tarfilename = '{}.auxproducts.tgz'.format(oussid)
LOG.info('Saving auxiliary data products in %s', tarfilename)
# Open tarfile
with tarfile.open(os.path.join(products_dir, tarfilename), 'w:gz') as tar:
# Save flux file
if os.path.exists(flux_file):
tar.add(flux_file, arcname=os.path.basename(flux_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(flux_file), tarfilename)
else:
LOG.info('Auxiliary data product flux.csv does not exist')
# Save antenna positions file
if os.path.exists(antpos_file):
tar.add(antpos_file, arcname=os.path.basename(antpos_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(antpos_file), tarfilename)
else:
LOG.info('Auxiliary data product antennapos.csv does not exist')
# Save continuum regions file
if os.path.exists(cont_file):
tar.add(cont_file, arcname=os.path.basename(cont_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(cont_file), tarfilename)
else:
LOG.info('Auxiliary data product cont.dat does not exist')
# Save target flag files
for flags_file in targetflags_filelist:
if os.path.exists(flags_file):
tar.add(flags_file, arcname=os.path.basename(flags_file))
LOG.info('Saving auxiliary data product %s in %s', os.path.basename(flags_file), tarfilename)
else:
LOG.info('Auxiliary data product flagging target templates file does not exist')
tar.close()
finally:
# Restore the original current working directory
os.chdir(cwd)
return tarfilename
def _make_pipe_manifest(self, context, oussid, stdfproducts, sessiondict, msvisdict, exportmses, calvisdict,
exportcalprods, calimages, targetimages):
"""
Generate the manifest file
"""
# Separate the calibrator images into per ous and per ms images
# based on the image values of prefix.
per_ous_calimages = []
per_ms_calimages = []
for image in calimages:
if image.startswith(oussid) or image.startswith('oussid') or image.startswith('unknown'):
per_ous_calimages.append(image)
else:
per_ms_calimages.append(image)
# Initialize the manifest document and the top level ous status.
pipemanifest = self._init_pipemanifest(oussid)
ouss = pipemanifest.set_ous(oussid)
pipemanifest.add_casa_version(ouss, environment.casa_version_string)
pipemanifest.add_pipeline_version(ouss, environment.pipeline_revision)
pipemanifest.add_procedure_name(ouss, context.project_structure.recipe_name)
pipemanifest.add_environment_info(ouss)
if stdfproducts.ppr_file:
pipemanifest.add_pprfile(ouss, os.path.basename(stdfproducts.ppr_file))
# Add the flagging and calibration products
for session_name in sessiondict:
session = pipemanifest.set_session(ouss, session_name)
if exportcalprods:
pipemanifest.add_caltables(session, sessiondict[session_name][1])
for vis_name in sessiondict[session_name][0]:
immatchlist = [imname for imname in per_ms_calimages if imname.startswith(vis_name)]
(ms_file, flags_file, calapply_file) = (None, None, None)
if exportmses:
ms_file = msvisdict[vis_name]
if exportcalprods:
(flags_file, calapply_file) = calvisdict[vis_name]
pipemanifest.add_asdm_imlist(session, vis_name, ms_file, flags_file, calapply_file, immatchlist,
'calibrator')
# Add a tar file of the web log
pipemanifest.add_weblog(ouss, os.path.basename(stdfproducts.weblog_file))
# Add the processing log independently of the web log
pipemanifest.add_casa_cmdlog(ouss, os.path.basename(stdfproducts.casa_commands_file))
# Add the processing script independently of the web log
pipemanifest.add_pipescript(ouss, os.path.basename(stdfproducts.casa_pipescript))
# Add the restore script independently of the web log
if stdfproducts.casa_restore_script != 'Undefined':
pipemanifest.add_restorescript(ouss, os.path.basename(stdfproducts.casa_restore_script))
# Add the calibrator images
pipemanifest.add_images(ouss, per_ous_calimages, 'calibrator')
# Add the target images
pipemanifest.add_images(ouss, targetimages, 'target')
return pipemanifest
def _init_pipemanifest(self, oussid):
"""
Initialize the pipeline manifest
"""
return manifest.PipelineManifest(oussid)
def _export_pprfile(self, context, output_dir, products_dir, oussid, pprfile):
# Prepare the search template for the pipeline processing request file.
# Was a template in the past
# Forced to one file now but keep the template structure for the moment
if pprfile == '':
ps = context.project_structure
if ps is None or ps.ppr_file == '':
pprtemplate = None
else:
pprtemplate = os.path.basename(ps.ppr_file)
else:
pprtemplate = os.path.basename(pprfile)
# Locate the pipeline processing request(s) and generate a list
# to be copied to the data products directory. Normally there
# should be only one match but if there are more copy them all.
pprmatches = []
if pprtemplate is not None:
for file in os.listdir(output_dir):
if fnmatch.fnmatch(file, pprtemplate):
LOG.debug('Located pipeline processing request {}'.format(file))
pprmatches.append(os.path.join(output_dir, file))
# Copy the pipeline processing request files.
pprmatchesout = []
for file in pprmatches:
if oussid:
outfile = os.path.join(products_dir, oussid + '.pprequest.xml')
else:
outfile = file
pprmatchesout.append(outfile)
LOG.info('Copying pipeline processing file {} to {}'.format(os.path.basename(file),
os.path.basename(outfile)))
if not self._executor._dry_run:
shutil.copy(file, outfile)
return pprmatchesout
def _export_final_ms(self, context, vis, products_dir):
"""
Save the ms to a compressed tarfile in products.
"""
# Save the current working directory and move to the pipeline
# working directory. This is required for tarfile IO
cwd = os.getcwd()
try:
os.chdir(context.output_dir)
# Define the name of the output tarfile
visname = os.path.basename(vis)
tarfilename = visname + '.tgz'
LOG.info('Storing final ms %s in %s' %(visname, tarfilename))
# Create the tar file
if self._executor._dry_run:
return tarfilename
tar = tarfile.open(os.path.join(products_dir, tarfilename), "w:gz")
tar.add(visname)
tar.close()
finally:
# Restore the original current working directory
os.chdir(cwd)
return tarfilename
def _save_final_flagversion(self, vis, flag_version_name):
"""
Save the final flags to a final flag version.
"""
LOG.info('Saving final flags for %s in flag version %s' % \
(os.path.basename(vis), flag_version_name))
if not self._executor._dry_run:
task = casa_tasks.flagmanager(vis=vis,
mode='save', versionname=flag_version_name)
self._executor.execute(task)
def _export_final_flagversion(self, context, vis, flag_version_name,
products_dir):
"""
Save the final flags version to a compressed tarfile in products.
"""
# Save the current working directory and move to the pipeline
# working directory. This is required for tarfile IO
cwd = os.getcwd()
os.chdir(context.output_dir)
# Define the name of the output tarfile
visname = os.path.basename(vis)
tarfilename = visname + '.flagversions.tgz'
LOG.info('Storing final flags for {} in {}'.format(visname, tarfilename))
# Define the directory to be saved
flagsname = os.path.join(visname + '.flagversions', 'flags.' + flag_version_name)
LOG.info('Saving flag version {}'.format(flag_version_name))
# Define the versions list file to be saved
flag_version_list = os.path.join(visname + '.flagversions', 'FLAG_VERSION_LIST')
ti = tarfile.TarInfo(flag_version_list)
line = "{} : Final pipeline flags\n".format(flag_version_name).encode(sys.stdout.encoding)
ti.size = len(line)
LOG.info('Saving flag version list')
# Create the tar file
if not self._executor._dry_run:
tar = tarfile.open(os.path.join(products_dir, tarfilename), "w:gz")
tar.add(flagsname)
tar.addfile(ti, io.BytesIO(line))
tar.close()
# Restore the original current working directory
os.chdir(cwd)
return tarfilename
def _export_final_applylist(self, context, vis, products_dir, imaging=False):
"""
Save the final calibration list to a file. For now this is
a text file. Eventually it will be the CASA callibrary file.
"""
applyfile_name = self.NameBuilder.calapply_list(vis=vis, aux_product=imaging)
# if imaging:
# applyfile_name = os.path.basename(vis) + '.auxcalapply.txt'
# else:
# applyfile_name = os.path.basename(vis) + '.calapply.txt'
LOG.info('Storing calibration apply list for %s in %s',
os.path.basename(vis), applyfile_name)
if self._executor._dry_run:
return applyfile_name
try:
calto = callibrary.CalTo(vis=vis)
applied_calstate = context.callibrary.applied.trimmed(context, calto)
# Log the list in human readable form. Better way to do this ?
nitems = 0
for calto, calfrom in applied_calstate.merged().items():
LOG.info('Apply to: Field: %s Spw: %s Antenna: %s',
calto.field, calto.spw, calto.antenna)
nitems = nitems + 1
for item in calfrom:
LOG.info(' Gaintable: %s Caltype: %s Gainfield: %s Spwmap: %s Interp: %s',
os.path.basename(item.gaintable),
item.caltype,
item.gainfield,
item.spwmap,
item.interp)
# Open the file
if nitems > 0:
with open(os.path.join(products_dir, applyfile_name), "w") as applyfile:
applyfile.write('# Apply file for %s\n' % (os.path.basename(vis)))
applyfile.write(applied_calstate.as_applycal())
else:
applyfile_name = 'Undefined'
LOG.info('No calibrations for MS %s' % os.path.basename(vis))
except:
applyfile_name = 'Undefined'
LOG.info('No calibrations for MS %s' % os.path.basename(vis))
return applyfile_name
def _get_sessions(self, context, sessions, vis):
"""
Return a list of sessions where each element of the list contains
the vis files associated with that session. In future this routine
will be driven by the context but for now use the user defined sessions
"""
# If the input session list is empty put all the visibility files
# in the same session.
if len(sessions) == 0:
wksessions = []
for visname in vis:
session = context.observing_run.get_ms(name=visname).session
wksessions.append(session)
else:
wksessions = sessions
# Determine the number of unique sessions.
session_seqno = 0; session_dict = {}
for i in range(len(wksessions)):
if wksessions[i] not in session_dict:
session_dict[wksessions[i]] = session_seqno
session_seqno = session_seqno + 1
# Initialize the output session names and visibility file lists
session_names = []
session_vis_list = []
for key, value in sorted(session_dict.items(), key=lambda k_v: (k_v[1], k_v[0])):
session_names.append(key)
session_vis_list.append([])
# Assign the visibility files to the correct session
for j in range(len(vis)):
# Match the session names if possible
if j < len(wksessions):
for i in range(len(session_names)):
if wksessions[j] == session_names[i]:
session_vis_list[i].append(vis[j])
# Assign to the last session
else:
session_vis_list[len(session_names)-1].append(vis[j])
# Log the sessions
for i in range(len(session_vis_list)):
LOG.info('Visibility list for session %s is %s' % \
(session_names[i], session_vis_list[i]))
return wksessions, session_names, session_vis_list
def _export_final_calfiles(self, context, oussid, session, vislist, products_dir, imaging=False):
"""
Save the final calibration tables in a tarfile one file
per session.
"""
# Save the current working directory and move to the pipeline
# working directory. This is required for tarfile IO
cwd = os.getcwd()
try:
os.chdir(context.output_dir)
# Define the name of the output tarfile
tarfilename = self.NameBuilder.caltables(ousstatus_entity_id=oussid,
session_name=session,
aux_product=imaging)
# if imaging:
# tarfilename = '{}.{}.auxcaltables.tgz'.format(oussid, session)
# else:
# tarfilename = '{}.{}.caltables.tgz'.format(oussid, session)
LOG.info('Saving final caltables for %s in %s', session, tarfilename)
# Create the tar file
if self._executor._dry_run:
return tarfilename
caltables = set()
for visfile in vislist:
LOG.info('Collecting final caltables for %s in %s',
os.path.basename(visfile), tarfilename)
# Create the list of applied caltables for that vis
try:
calto = callibrary.CalTo(vis=visfile)
calstate = context.callibrary.applied.trimmed(context, calto)
caltables.update(calstate.get_caltable())
except:
LOG.info('No caltables for MS %s' % os.path.basename(visfile))
if not caltables:
return 'Undefined'
with tarfile.open(os.path.join(products_dir, tarfilename), 'w:gz') as tar:
# Tar the session list.
for table in caltables:
tar.add(table, arcname=os.path.basename(table))
return tarfilename
finally:
# Restore the original current working directory
os.chdir(cwd)
def _export_weblog(self, context, products_dir, oussid):
"""
Save the processing web log to a tarfile
"""
# Save the current working directory and move to the pipeline
# working directory. This is required for tarfile IO
cwd = os.getcwd()
os.chdir(context.output_dir)
# Define the name of the output tarfile
ps = context.project_structure
# if ps is None:
# tarfilename = 'weblog.tgz'
# elif ps.ousstatus_entity_id == 'unknown':
# tarfilename = 'weblog.tgz'
# else:
# tarfilename = oussid + '.weblog.tgz'
tarfilename = self.NameBuilder.weblog(project_structure=ps,
ousstatus_entity_id=oussid)
LOG.info('Saving final weblog in %s' %(tarfilename))
# Create the tar file
if not self._executor._dry_run:
tar = tarfile.open(os.path.join(products_dir, tarfilename), "w:gz")
tar.add(os.path.join(os.path.basename(os.path.dirname(context.report_dir)), 'html'))
tar.close()
# Restore the original current working directory
os.chdir(cwd)
return tarfilename
def _export_casa_commands_log(self, context, casalog_name, products_dir, oussid):
"""
Save the CASA commands file.
"""
casalog_file = os.path.join(context.report_dir, casalog_name)
ps = context.project_structure
out_casalog_file = self.NameBuilder.casa_script(casalog_name,
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
# if ps is None or ps.ousstatus_entity_id == 'unknown':
# out_casalog_file = os.path.join(products_dir, casalog_name)
# else:
# out_casalog_file = os.path.join(products_dir, oussid + '.' + casalog_name)
LOG.info('Copying casa commands log {} to {}'.format(casalog_file, out_casalog_file))
if not self._executor._dry_run:
shutil.copy(casalog_file, out_casalog_file)
return os.path.basename(out_casalog_file)
def _export_casa_restore_script(self, context, script_name, products_dir, oussid, vislist, session_list):
"""
Save the CASA restore scropt.
"""
script_file = os.path.join(context.report_dir, script_name)
# Get the output file name
ps = context.project_structure
out_script_file = self.NameBuilder.casa_script(script_name,
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
# if ps is None or ps.ousstatus_entity_id == 'unknown':
# out_script_file = os.path.join(products_dir, script_name)
# else:
# out_script_file = os.path.join(products_dir, oussid + '.' + script_name)
LOG.info('Creating casa restore script {}'.format(script_file))
# This is hardcoded.
tmpvislist = []
#ALMA default
ocorr_mode = 'ca'
for vis in vislist:
filename = os.path.basename(vis)
if filename.endswith('.ms'):
filename, filext = os.path.splitext(filename)
tmpvislist.append(filename)
task_string = " hif_restoredata(vis=%s, session=%s, ocorr_mode='%s')" % (tmpvislist, session_list, ocorr_mode)
template = '''__rethrow_casa_exceptions = True
h_init()
try:
%s
finally:
h_save()
''' % task_string
with open(script_file, 'w') as casa_restore_file:
casa_restore_file.write(template)
LOG.info('Copying casa restore script %s to %s' % \
(script_file, out_script_file))
if not self._executor._dry_run:
shutil.copy(script_file, out_script_file)
return os.path.basename(out_script_file)
def _export_casa_script(self, context, casascript_name, products_dir, oussid):
"""
Save the CASA script.
"""
ps = context.project_structure
casascript_file = os.path.join(context.report_dir, casascript_name)
out_casascript_file = self.NameBuilder.casa_script(casascript_name,
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
# if ps is None:
# casascript_file = os.path.join(context.report_dir, casascript_name)
# out_casascript_file = os.path.join(products_dir, casascript_name)
# elif ps.ousstatus_entity_id == 'unknown':
# casascript_file = os.path.join(context.report_dir, casascript_name)
# out_casascript_file = os.path.join(products_dir, casascript_name)
# else:
# #ousid = ps.ousstatus_entity_id.translate(str.maketrans(':/', '__'))
# casascript_file = os.path.join(context.report_dir, casascript_name)
# out_casascript_file = os.path.join(products_dir, oussid + '.' + casascript_name)
LOG.info('Copying casa script file %s to %s' % \
(casascript_file, out_casascript_file))
if not self._executor._dry_run:
shutil.copy(casascript_file, out_casascript_file)
return os.path.basename(out_casascript_file)
def _export_pipe_manifest(self, oussid, manifest_name, products_dir, pipemanifest):
"""
Save the manifest file.
"""
out_manifest_file = self.NameBuilder.manifest(manifest_name,
ousstatus_entity_id=oussid,
output_dir=products_dir)
#out_manifest_file = os.path.join(products_dir, oussid + '.' + manifest_name)
LOG.info('Creating manifest file {}'.format(out_manifest_file))
if not self._executor._dry_run:
pipemanifest.write(out_manifest_file)
return out_manifest_file
def _export_images(self, context, calimages, calintents, images,
products_dir):
"""
Export the images to FITS files.
"""
# Create the image list
images_list = []
if len(images) == 0:
# Get the image library
if calimages:
LOG.info('Exporting calibrator source images')
if calintents == '':
intents = ['PHASE', 'BANDPASS', 'CHECK', 'AMPLITUDE']
else:
intents = calintents.split(',')
cleanlist = context.calimlist.get_imlist()
else:
LOG.info('Exporting target source images')
intents = ['TARGET']
cleanlist = context.sciimlist.get_imlist()
for image_number, image in enumerate(cleanlist):
# We need to store the image
cleanlist[image_number]['fitsfiles'] = []
cleanlist[image_number]['auxfitsfiles'] = []
version = image.get('version', 1)
# Image name probably includes path
if image['sourcetype'] in intents:
if image['multiterm']:
for nt in range(image['multiterm']):
imagename = image['imagename'].replace('.image', '.image.tt%d' % (nt))
images_list.append((imagename, version))
cleanlist[image_number]['fitsfiles'].append(fitsname(products_dir, imagename, version))
if (image['imagename'].find('.pbcor') != -1):
imagename = image['imagename'].replace('.image.pbcor', '.alpha')
images_list.append((imagename, version))
cleanlist[image_number]['fitsfiles'].append(fitsname(products_dir, imagename, version))
imagename = '%s.error' % (image['imagename'].replace('.image.pbcor', '.alpha'))
images_list.append((imagename, version))
cleanlist[image_number]['fitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename'].replace('.image', '.alpha')
images_list.append((imagename, version))
cleanlist[image_number]['fitsfiles'].append(fitsname(products_dir, imagename, version))
imagename = '%s.error' % (image['imagename'].replace('.image', '.alpha'))
images_list.append((imagename, version))
cleanlist[image_number]['fitsfiles'].append(fitsname(products_dir, imagename, version))
elif (image['imagename'].find('image.sd') != -1): # single dish
imagename = image['imagename']
images_list.append((imagename, version))
cleanlist[image_number]['fitsfiles'].append(fitsname(products_dir, imagename, version))
imagename = image['imagename'].replace('image.sd', 'image.sd.weight')
images_list.append((imagename, version))
cleanlist[image_number]['fitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename']
images_list.append((imagename, version))
cleanlist[image_number]['fitsfiles'].append(fitsname(products_dir, imagename, version))
# Add PBs for interferometry
if (image['imagename'].find('.image') != -1) and (image['imagename'].find('.image.sd') == -1):
if (image['imagename'].find('.pbcor') != -1):
if (image['multiterm']):
imagename = image['imagename'].replace('.image.pbcor', '.pb.tt0')
images_list.append((imagename, version))
cleanlist[image_number]['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename'].replace('.image.pbcor', '.pb')
images_list.append((imagename, version))
cleanlist[image_number]['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
if (image['multiterm']):
imagename = image['imagename'].replace('.image', '.pb.tt0')
images_list.append((imagename, version))
cleanlist[image_number]['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename'].replace('.image', '.pb')
images_list.append((imagename, version))
cleanlist[image_number]['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
# Add auto-boxing masks for interferometry
if (image['imagename'].find('.image') != -1) and (image['imagename'].find('.image.sd') == -1):
if (image['imagename'].find('.pbcor') != -1):
imagename = image['imagename'].replace('.image.pbcor', '.mask')
imagename2 = image['imagename'].replace('.image.pbcor', '.cleanmask')
if os.path.exists(imagename) and not os.path.exists(imagename2):
images_list.append((imagename, version))
cleanlist[image_number]['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
imagename = image['imagename'].replace('.image', '.mask')
imagename2 = image['imagename'].replace('.image', '.cleanmask')
if os.path.exists(imagename) and not os.path.exists(imagename2):
images_list.append((imagename, version))
cleanlist[image_number]['auxfitsfiles'].append(fitsname(products_dir, imagename, version))
else:
# Assume only the root image name was given.
cleanlib = imagelibrary.ImageLibrary()
for image in images:
if calimages:
imageitem = imagelibrary.ImageItem(imagename=image,
sourcename='UNKNOWN',
spwlist='UNKNOWN',
sourcetype='CALIBRATOR')
else:
imageitem = imagelibrary.ImageItem(imagename=image,
sourcename='UNKNOWN',
spwlist='UNKNOWN',
sourcetype='TARGET')
cleanlib.add_item(imageitem)
if os.path.basename(image) == '':
images_list.append((os.path.join(context.output_dir, image), 1))
else:
images_list.append((image, 1))
cleanlist = cleanlib.get_imlist()
# Need to add the FITS names
for i in range(len(cleanlist)):
cleanlist[i]['fitsfiles'] = [fitsname(products_dir, images_list[i][0])]
cleanlist[i]['auxfitsfiles'] = []
# Convert to FITS.
fits_list = []
for image_ver in images_list:
image, version = image_ver
print('Working on {}'.format(image))
fitsfile = fitsname(products_dir, image, version)
# skip if image doesn't exist
if not os.path.exists(image):
LOG.info('Skipping unexisting image {}'.format(os.path.basename(image)))
continue
LOG.info('Saving final image {} to FITS file {}'.format(os.path.basename(image),
os.path.basename(fitsfile)))
# PIPE-325: abbreviate 'spw' for FITS header when spw string is "too long"
with casa_tools.ImageReader(image) as img:
info = img.miscinfo()
if ('spw' in info) and (len(info['spw']) >= 68):
spw_sorted = sorted([int(x) for x in info['spw'].split(',')])
info['spw'] = '{},...,{}'.format(spw_sorted[0], spw_sorted[-1])
img.setmiscinfo(info)
if not self._executor._dry_run:
task = casa_tasks.exportfits(imagename=image, fitsimage=fitsfile, velocity=False, optical=False,
bitpix=-32, minpix=0, maxpix=-1, overwrite=True, dropstokes=False,
stokeslast=True)
self._executor.execute(task)
fits_list.append(fitsfile)
new_cleanlist = copy.deepcopy(cleanlist)
return new_cleanlist, fits_list
@staticmethod
def _add_to_manifest(manifest_file, aux_fproducts, aux_caltablesdict, aux_calapplysdict, aqua_report):
pipemanifest = manifest.PipelineManifest('')
pipemanifest.import_xml(manifest_file)
ouss = pipemanifest.get_ous()
if aqua_report:
pipemanifest.add_aqua_report(ouss, os.path.basename(aqua_report))
if aux_fproducts:
# Add auxiliary data products file
pipemanifest.add_aux_products_file(ouss, os.path.basename(aux_fproducts))
# Add the auxiliary caltables
if aux_caltablesdict:
for session_name in aux_caltablesdict:
session = pipemanifest.get_session(ouss, session_name)
if session is None:
session = pipemanifest.set_session(ouss, session_name)
pipemanifest.add_auxcaltables(session, aux_caltablesdict[session_name][1])
for vis_name in aux_caltablesdict[session_name][0]:
pipemanifest.add_auxasdm(session, vis_name, aux_calapplysdict[vis_name])
pipemanifest.write(manifest_file)