import collections
import errno
import fnmatch
import os
import shutil
import tarfile
import pyfits
import pipeline as pipeline
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline import environment
from pipeline.h.tasks.common import manifest
from pipeline.h.tasks.exportdata import exportdata
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry
from pipeline.infrastructure import utils
LOG = infrastructure.get_logger(__name__)
StdFileProducts = collections.namedtuple('StdFileProducts', 'ppr_file weblog_file casa_commands_file casa_pipescript')
[docs]class ExportvlassdataResults(basetask.Results):
def __init__(self, final=[], pool=[], preceding=[]):
super(ExportvlassdataResults, self).__init__()
self.pool = pool[:]
self.final = final[:]
self.preceding = preceding[:]
self.error = set()
def __repr__(self):
return 'ExportvlassdataResults:'
[docs]@task_registry.set_equivalent_casa_task('hifv_exportvlassdata')
class Exportvlassdata(basetask.StandardTaskTemplate):
Inputs = ExportvlassdataInputs
NameBuilder = exportdata.PipelineProductNameBuiler
[docs] def prepare(self):
LOG.info("This Exportvlassdata class is running.")
# Create a local alias for inputs, so we're not saying
# 'self.inputs' everywhere
inputs = self.inputs
try:
LOG.trace('Creating products directory: {!s}'.format(inputs.products_dir))
os.makedirs(inputs.products_dir)
except OSError as exc:
if exc.errno == errno.EEXIST:
pass
else:
raise
# Initialize the standard ous is string.
oussid = self.get_oussid(inputs.context)
# Define the results object
result = ExportvlassdataResults()
# Make the standard vislist and the sessions lists.
session_list, session_names, session_vislists, vislist = self._make_lists(inputs.context, inputs.session,
inputs.vis)
# Export the standard per OUS file products
# The pipeline processing request
# A compressed tarfile of the weblog
# The pipeline processing script
# The CASA commands log
recipe_name = self.get_recipename(inputs.context)
if not recipe_name:
prefix = oussid
else:
prefix = oussid + '.' + recipe_name
stdfproducts = self._do_standard_ous_products(inputs.context, prefix, inputs.pprfile, inputs.output_dir,
inputs.products_dir)
if stdfproducts.ppr_file:
result.pprequest = os.path.basename(stdfproducts.ppr_file)
result.weblog = os.path.basename(stdfproducts.weblog_file)
result.pipescript = os.path.basename(stdfproducts.casa_pipescript)
result.commandslog = os.path.basename(stdfproducts.casa_commands_file)
imlist = self.inputs.context.subimlist.get_imlist()
# PIPE-592: find out imaging mode (stored in context by hif_editimlist)
if hasattr(self.inputs.context, 'imaging_mode'):
img_mode = self.inputs.context.imaging_mode
else:
LOG.warn("imaging_mode property does not exist in context, alpha images will not be written.")
img_mode = None
images_list = []
for imageitem in imlist:
if imageitem['multiterm']:
pbcor_image_name = imageitem['imagename'].replace('subim', 'pbcor.tt0.subim')
rms_image_name = imageitem['imagename'].replace('subim', 'pbcor.tt0.rms.subim')
image_bundle = [pbcor_image_name, rms_image_name]
# PIPE-592: save VLASS SE alpha and alpha error images
if img_mode == 'VLASS-SE-CONT':
alpha_image_name = imageitem['imagename'].replace('.image.subim', '.alpha.subim')
alpha_image_error_name = imageitem['imagename'].replace('.image.subim', '.alpha.error.subim')
image_bundle.extend([alpha_image_name, alpha_image_error_name])
else:
pbcor_image_name = imageitem['imagename'].replace('subim', 'pbcor.subim')
rms_image_name = imageitem['imagename'].replace('subim', 'pbcor.rms.subim')
image_bundle = [pbcor_image_name, rms_image_name]
images_list.extend(image_bundle)
fits_list = []
for image in images_list:
fitsfile = os.path.join(inputs.products_dir, image + '.fits')
task = casa_tasks.exportfits(imagename=image, fitsimage=fitsfile)
self._executor.execute(task)
LOG.info('Wrote {ff}'.format(ff=fitsfile))
fits_list.append(fitsfile)
# Apply position corrections to VLASS-QL product images (PIPE-587) and fix FITS header (PIPE-641)
if img_mode == 'VLASS-QL':
# Mean antenna geographic coordinates
observatory = casa_tools.measures.observatory(self.inputs.context.project_summary.telescope)
# Mean observing date
start_time = self.inputs.context.observing_run.start_datetime
end_time = self.inputs.context.observing_run.end_datetime
mid_time = start_time + (end_time - start_time) / 2
mid_time = casa_tools.measures.epoch('utc', mid_time.isoformat())
# Correction
utils.positioncorrection.do_wide_field_pos_cor(fitsfile, date_time=mid_time, obs_long=observatory['m0'],
obs_lat=observatory['m1'])
# PIPE-641: update FITS header for VLASS-QL
self._fix_vlass_fits_header(self.inputs.context, fitsfile)
# Export the pipeline manifest file
# TBD Remove support for auxiliary data products to the individual pipelines
pipemanifest = self._make_pipe_manifest(inputs.context, oussid, stdfproducts, {}, {}, [], fits_list)
casa_pipe_manifest = self._export_pipe_manifest('pipeline_manifest.xml', inputs.products_dir, pipemanifest)
result.manifest = os.path.basename(casa_pipe_manifest)
# Return the results object, which will be used for the weblog
return result
[docs] def analyse(self, results):
return results
[docs] def get_oussid(self, context):
"""
Determine the ous prefix
"""
# Get the parent ous ousstatus name. This is the sanitized ous
# status uid
ps = context.project_structure
if ps is None:
oussid = 'unknown'
elif ps.ousstatus_entity_id == 'unknown':
oussid = 'unknown'
else:
oussid = ps.ousstatus_entity_id.translate(str.maketrans(':/', '__'))
return oussid
[docs] def get_recipename(self, context):
"""
Get the recipe name
"""
# Get the parent ous ousstatus name. This is the sanitized ous
# status uid
ps = context.project_structure
if ps is None:
recipe_name = ''
elif ps.recipe_name == 'Undefined':
recipe_name = ''
else:
recipe_name = ps.recipe_name
return recipe_name
def _make_lists(self, context, session, vis, imaging=False):
"""
Create the vis and sessions lists
"""
# Force inputs.vis to be a list.
vislist = vis
if isinstance(vislist, str):
vislist = [vislist, ]
if imaging:
vislist = [vis for vis in vislist if context.observing_run.get_ms(name=vis).is_imaging_ms]
else:
vislist = [vis for vis in vislist if not context.observing_run.get_ms(name=vis).is_imaging_ms]
# Get the session list and the visibility files associated with
# each session.
session_list, session_names, session_vislists = self._get_sessions(context, session, vislist)
return session_list, session_names, session_vislists, vislist
def _do_standard_ous_products(self, context, oussid, pprfile, output_dir, products_dir):
"""
Generate the per ous standard products
"""
# Locate and copy the pipeline processing request.
# There should normally be at most one pipeline processing request.
# In interactive mode there is no PPR.
ppr_files = self._export_pprfile(context, output_dir, products_dir, oussid, pprfile)
if ppr_files != []:
ppr_file = os.path.basename(ppr_files[0])
else:
ppr_file = None
# Export a tar file of the web log
weblog_file = self._export_weblog(context, products_dir, oussid)
# Export the processing log independently of the web log
casa_commands_file = self._export_casa_commands_log(context,
context.logs['casa_commands'], products_dir, oussid)
# Export the processing script independently of the web log
casa_pipescript = self._export_casa_script(context, context.logs['pipeline_script'], products_dir, oussid)
return StdFileProducts(ppr_file,
weblog_file,
casa_commands_file,
casa_pipescript)
def _make_pipe_manifest(self, context, oussid, stdfproducts, sessiondict, visdict, calimages, targetimages):
"""
Generate the manifest file
"""
# Initialize the manifest document and the top level ous status.
pipemanifest = self._init_pipemanifest(oussid)
ouss = pipemanifest.set_ous(oussid)
pipemanifest.add_casa_version(ouss, environment.casa_version_string)
pipemanifest.add_pipeline_version(ouss, pipeline.revision)
pipemanifest.add_procedure_name(ouss, context.project_structure.recipe_name)
if stdfproducts.ppr_file:
pipemanifest.add_pprfile(ouss, os.path.basename(stdfproducts.ppr_file))
# Add the flagging and calibration products
for session_name in sessiondict:
session = pipemanifest.set_session(ouss, session_name)
pipemanifest.add_caltables(session, sessiondict[session_name][1])
for vis_name in sessiondict[session_name][0]:
pipemanifest.add_asdm(session, vis_name, visdict[vis_name][0], visdict[vis_name][1])
# Add a tar file of the web log
pipemanifest.add_weblog(ouss, os.path.basename(stdfproducts.weblog_file))
# Add the processing log independently of the web log
pipemanifest.add_casa_cmdlog(ouss, os.path.basename(stdfproducts.casa_commands_file))
# Add the processing script independently of the web log
pipemanifest.add_pipescript(ouss, os.path.basename(stdfproducts.casa_pipescript))
# Add the calibrator images
pipemanifest.add_images(ouss, calimages, 'calibrator')
# Add the target images
pipemanifest.add_images(ouss, targetimages, 'target')
return pipemanifest
def _init_pipemanifest(self, oussid):
"""
Initialize the pipeline manifest
"""
pipemanifest = manifest.PipelineManifest(oussid)
return pipemanifest
def _export_pprfile(self, context, output_dir, products_dir, oussid, pprfile):
# Prepare the search template for the pipeline processing request file.
# Was a template in the past
# Forced to one file now but keep the template structure for the moment
if pprfile == '':
ps = context.project_structure
if ps is None:
pprtemplate = None
elif ps.ppr_file == '':
pprtemplate = None
else:
pprtemplate = os.path.basename(ps.ppr_file)
else:
pprtemplate = os.path.basename(pprfile)
# Locate the pipeline processing request(s) and generate a list
# to be copied to the data products directory. Normally there
# should be only one match but if there are more copy them all.
pprmatches = []
if pprtemplate is not None:
for file in os.listdir(output_dir):
if fnmatch.fnmatch(file, pprtemplate):
LOG.debug('Located pipeline processing request %s' % file)
pprmatches.append(os.path.join(output_dir, file))
# Copy the pipeline processing request files.
pprmatchesout = []
for file in pprmatches:
if oussid:
outfile = os.path.join(products_dir, oussid + '.pprequest.xml')
else:
outfile = file
pprmatchesout.append(outfile)
LOG.info('Copying pipeline processing file %s to %s' % (os.path.basename(file), os.path.basename(outfile)))
if not self._executor._dry_run:
shutil.copy(file, outfile)
return pprmatchesout
def _get_sessions(self, context, sessions, vis):
"""
Return a list of sessions where each element of the list contains
the vis files associated with that session. In future this routine
will be driven by the context but for now use the user defined sessions
"""
# If the input session list is empty put all the visibility files
# in the same session.
if len(sessions) == 0:
wksessions = []
for visname in vis:
session = context.observing_run.get_ms(name=visname).session
wksessions.append(session)
else:
wksessions = sessions
# Determine the number of unique sessions.
session_seqno = 0
session_dict = {}
for i in range(len(wksessions)):
if wksessions[i] not in session_dict:
session_dict[wksessions[i]] = session_seqno
session_seqno = session_seqno + 1
# Initialize the output session names and visibility file lists
session_names = []
session_vis_list = []
for key, value in sorted(session_dict.items(), key=lambda k_v: (k_v[1], k_v[0])):
session_names.append(key)
session_vis_list.append([])
# Assign the visibility files to the correct session
for j in range(len(vis)):
# Match the session names if possible
if j < len(wksessions):
for i in range(len(session_names)):
if wksessions[j] == session_names[i]:
session_vis_list[i].append(vis[j])
# Assign to the last session
else:
session_vis_list[len(session_names) - 1].append(vis[j])
# Log the sessions
for i in range(len(session_vis_list)):
LOG.info('Visibility list for session %s is %s' % (session_names[i], session_vis_list[i]))
return wksessions, session_names, session_vis_list
def _export_weblog(self, context, products_dir, oussid):
"""
Save the processing web log to a tarfile
"""
# Save the current working directory and move to the pipeline
# working directory. This is required for tarfile IO
cwd = os.getcwd()
os.chdir(context.output_dir)
# Define the name of the output tarfile
ps = context.project_structure
tarfilename = self.NameBuilder.weblog(project_structure=ps,
ousstatus_entity_id=oussid)
# if ps is None:
# tarfilename = 'weblog.tgz'
# elif ps.ousstatus_entity_id == 'unknown':
# tarfilename = 'weblog.tgz'
# else:
# tarfilename = oussid + '.weblog.tgz'
LOG.info('Saving final weblog in %s' % tarfilename)
# Create the tar file
if not self._executor._dry_run:
tar = tarfile.open(os.path.join(products_dir, tarfilename), "w:gz")
tar.add(os.path.join(os.path.basename(os.path.dirname(context.report_dir)), 'html'))
tar.close()
# Restore the original current working directory
os.chdir(cwd)
return tarfilename
def _export_casa_commands_log(self, context, casalog_name, products_dir, oussid):
"""
Save the CASA commands file.
"""
ps = context.project_structure
casalog_file = os.path.join(context.report_dir, casalog_name)
out_casalog_file = self.NameBuilder.casa_script(casalog_name,
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
# if ps is None:
# casalog_file = os.path.join(context.report_dir, casalog_name)
# out_casalog_file = os.path.join(products_dir, casalog_name)
# elif ps.ousstatus_entity_id == 'unknown':
# casalog_file = os.path.join(context.report_dir, casalog_name)
# out_casalog_file = os.path.join(products_dir, casalog_name)
# else:
# casalog_file = os.path.join(context.report_dir, casalog_name)
# out_casalog_file = os.path.join(products_dir, oussid + '.' + casalog_name)
LOG.info('Copying casa commands log %s to %s' % (casalog_file, out_casalog_file))
if not self._executor._dry_run:
shutil.copy(casalog_file, out_casalog_file)
return os.path.basename(out_casalog_file)
def _export_casa_script(self, context, casascript_name, products_dir, oussid):
"""
Save the CASA script.
"""
ps = context.project_structure
casascript_file = os.path.join(context.report_dir, casascript_name)
out_casascript_file = self.NameBuilder.casa_script(casascript_name,
project_structure=ps,
ousstatus_entity_id=oussid,
output_dir=products_dir)
# if ps is None:
# casascript_file = os.path.join(context.report_dir, casascript_name)
# out_casascript_file = os.path.join(products_dir, casascript_name)
# elif ps.ousstatus_entity_id == 'unknown':
# casascript_file = os.path.join(context.report_dir, casascript_name)
# out_casascript_file = os.path.join(products_dir, casascript_name)
# else:
# # ousid = ps.ousstatus_entity_id.translate(str.maketrans(':/', '__'))
# casascript_file = os.path.join(context.report_dir, casascript_name)
# out_casascript_file = os.path.join(products_dir, oussid + '.' + casascript_name)
LOG.info('Copying casa script file %s to %s' % (casascript_file, out_casascript_file))
if not self._executor._dry_run:
shutil.copy(casascript_file, out_casascript_file)
return os.path.basename(out_casascript_file)
def _export_pipe_manifest(self, manifest_name, products_dir, pipemanifest):
"""
Save the manifest file.
"""
out_manifest_file = os.path.join(products_dir, manifest_name)
LOG.info('Creating manifest file %s' % out_manifest_file)
if not self._executor._dry_run:
pipemanifest.write(out_manifest_file)
return out_manifest_file
def _fix_vlass_fits_header(self, context, fitsname):
"""
Update VLASS FITS product header according to PIPE-641.
Should be called in VLASS-QL imaging mode.
The following keywords are changed: DATE-OBS, DATE-END, RADESYS, OBJECT.
"""
if os.path.exists(fitsname):
# Open FITS image and obtain header
hdulist = pyfits.open(fitsname, mode='update')
header = hdulist[0].header
# DATE-OBS and DATE-END keywords
# Note: the new DATE-OBS value (first scan start time) might differ from the original value
# (first un-flagged scan start time).
header['date-obs'] = (infrastructure.utils.get_epoch_as_datetime(
context.observing_run.start_time).isoformat(), 'First scan started')
date_end = ('date-end', infrastructure.utils.get_epoch_as_datetime(
context.observing_run.end_time).isoformat(), 'Last scan finished')
if 'date-end' in [k.lower() for k in header.keys()]:
header['date-end'] = date_end[1:]
else:
pos = header.index('date-obs')
header.insert(pos, date_end, after=True)
# RADESYS
if header['radesys'].upper() == 'FK5':
header['radesys'] = 'ICRS'
# Object keyword
if header['object'].upper() != header['filnam05'].upper():
header['object'] = header['filnam05']
# Save changes and inform log
hdulist.flush()
LOG.info("Header updated in {}".format(fitsname))
# Close FITS file
hdulist.close()
else:
LOG.warn('FITS header cannot be updated: image {} does not exist.'.format(fitsname))
return