import contextlib
import os
import shutil
import tarfile
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.mpihelpers as mpihelpers
import pipeline.infrastructure.tablereader as tablereader
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
from . import fluxes
__all__ = [
'ImportData',
'ImportDataInputs',
'ImportDataResults'
]
LOG = infrastructure.get_logger(__name__)
[docs]class ImportDataResults(basetask.Results):
"""
ImportDataResults holds the results of the ImportData task. It contains
the resulting MeasurementSet domain objects and optionally the additional
SetJy results generated from flux entries in Source.xml.
"""
def __init__(self, mses=None, setjy_results=None):
super(ImportDataResults, self).__init__()
self.mses = [] if mses is None else mses
self.setjy_results = setjy_results
self.origin = {}
# Flux service query is None (dbservice=False), FIRSTURL, BACKUPURL, or FAIL
self.fluxservice = None
[docs] def merge_with_context(self, context):
target = context.observing_run
for ms in self.mses:
LOG.info('Adding {0} to context'.format(ms.name))
target.add_measurement_set(ms)
if self.setjy_results:
for result in self.setjy_results:
result.merge_with_context(context)
def __repr__(self):
return 'ImportDataResults:\n\t{0}'.format(
'\n\t'.join([ms.name for ms in self.mses]))
[docs]@task_registry.set_equivalent_casa_task('h_importdata')
@task_registry.set_casa_commands_comment('If required, ASDMs are converted to MeasurementSets.')
class ImportData(basetask.StandardTaskTemplate):
Inputs = ImportDataInputs
Results = ImportDataResults
@staticmethod
def _ms_directories(names):
"""
Inspect a list of file entries, finding the root directory of any
measurement sets present via a set of characteristic files and
directories.
"""
identifiers = ('SOURCE', 'FIELD', 'ANTENNA', 'DATA_DESCRIPTION')
matching = [os.path.dirname(n) for n in names
if os.path.basename(n) in identifiers]
return {m for m in matching if matching.count(m) == len(identifiers)}
@staticmethod
def _asdm_directories(members):
"""
Inspect a list of file entries, finding the root directory of any
ASDMs present via a set of characteristic files and directories.
"""
identifiers = ('ASDMBinary', 'Main.xml', 'ASDM.xml', 'Antenna.xml')
matching = [os.path.dirname(m) for m in members
if os.path.basename(m) in identifiers]
return {m for m in matching if matching.count(m) == len(identifiers)}
[docs] def prepare(self, **parameters):
inputs = self.inputs
vis = inputs.vis
if vis is None:
msg = 'Empty input data set list'
LOG.warning(msg)
raise ValueError(msg)
if not os.path.exists(vis):
msg = 'Input data set \'{0}\' not found'.format(vis)
LOG.error(msg)
raise IOError(msg)
results = self.Results()
# if this is a tar, get the names of the files and directories inside
# the tar and calculate which can be directly imported (filenames with
# a measurement set fingerprint) and which must be converted (files
# with an ASDM fingerprint).
if os.path.isfile(vis) and tarfile.is_tarfile(vis):
with contextlib.closing(tarfile.open(vis)) as tar:
filenames = tar.getnames()
(to_import, to_convert) = self._analyse_filenames(filenames,
vis)
to_convert = [os.path.join(inputs.output_dir, asdm)
for asdm in to_convert]
to_import = [os.path.join(inputs.output_dir, ms)
for ms in to_import]
if not self._executor._dry_run:
LOG.info('Extracting %s to %s' % (vis, inputs.output_dir))
tar.extractall(path=inputs.output_dir)
# Assume that if vis is not a tar, it's a directory ready to be
# imported, or in the case of an ASDM, converted then imported.
else:
# get a list of all the files in the given directory
filenames = [os.path.join(vis, f) for f in os.listdir(vis)]
(to_import, to_convert) = self._analyse_filenames(filenames,
vis)
if not to_import and not to_convert:
raise TypeError('{!s} is of unhandled type'.format(vis))
# convert all paths to absolute paths for the next sequence
to_import = list(map(os.path.abspath, to_import))
# if the file is not in the working directory, copy it across,
# replacing the filename with the relocated filename
to_copy = {f for f in to_import
if f.find(inputs.output_dir) != 0
and inputs.nocopy is False}
for src in to_copy:
dst = os.path.join(os.path.abspath(inputs.output_dir),
os.path.basename(src))
to_import.remove(src)
to_import.append(dst)
if os.path.exists(dst):
LOG.warning('%s already in %s. Will import existing data.'
'' % (os.path.basename(src), inputs.output_dir))
continue
if not self._executor._dry_run:
LOG.info('Copying %s to %s' % (src, inputs.output_dir))
shutil.copytree(src, dst)
# launch an import job for each ASDM we need to convert
for asdm in to_convert:
self._do_importasdm(asdm)
# calculate the filenames of the resultant measurement sets
asdms = [os.path.join(inputs.output_dir, f) for f in to_convert]
# Now everything is in MS format, create a list of the MSes to import
converted_asdms = [self._asdm_to_vis_filename(asdm) for asdm in asdms]
to_import.extend(converted_asdms)
# get the path to the MS for the converted ASDMs, which we'll later
# compare to ms.name in order to calculate the origin of each MS
converted_asdm_abspaths = [os.path.abspath(f) for f in converted_asdms]
LOG.info('Creating pipeline objects for measurement set(s) {0}'
''.format(', '.join(to_import)))
if self._executor._dry_run:
return ImportDataResults()
ms_reader = tablereader.ObservingRunReader
to_import = [os.path.abspath(f) for f in to_import]
observing_run = ms_reader.get_observing_run(to_import)
for ms in observing_run.measurement_sets:
LOG.debug('Setting session to %s for %s' % (inputs.session,
ms.basename))
if inputs.asimaging:
LOG.info('Importing %s as an imaging measurement set' % (ms.basename))
ms.is_imaging_ms = True
ms.session = inputs.session
ms_origin = 'ASDM' if ms.name in converted_asdm_abspaths else 'MS'
results.origin[ms.basename] = ms_origin
fluxservice, combined_results = self._get_fluxes(inputs.context, observing_run)
results.mses.extend(observing_run.measurement_sets)
results.setjy_results = combined_results
results.fluxservice = fluxservice
return results
[docs] def analyse(self, result):
return result
def _get_fluxes(self, context, observing_run):
# get the flux measurements from Source.xml for each MS
xml_results = fluxes.get_setjy_results(observing_run.measurement_sets)
# write/append them to flux.csv
# Cycle 1 hack for exporting the field intents to the CSV file:
# export_flux_from_result queries the context, so we pseudo-register
# the mses with the context by replacing the original observing run
orig_observing_run = context.observing_run
context.observing_run = observing_run
try:
fluxes.export_flux_from_result(xml_results, context)
finally:
context.observing_run = orig_observing_run
# re-read from flux.csv, which will include any user-coded values
combined_results = fluxes.import_flux(context.output_dir, observing_run)
# Flux service not used, return None by default
return None, combined_results
def _analyse_filenames(self, filenames, vis):
to_import = set()
to_convert = set()
ms_dirs = self._ms_directories(filenames)
if ms_dirs:
LOG.debug('Adding measurement set(s) {0} from {1} to import queue'
''.format(', '.join([os.path.basename(f) for f in ms_dirs]),
vis))
cleaned_paths = list(map(os.path.normpath, ms_dirs))
to_import.update(cleaned_paths)
asdm_dirs = self._asdm_directories(filenames)
if asdm_dirs:
LOG.debug('Adding ASDMs {0} from {1} to conversion queue'
''.format(', '.join(asdm_dirs), vis))
to_convert.update(asdm_dirs)
return to_import, to_convert
def _asdm_to_vis_filename(self, asdm):
return '{0}.ms'.format(os.path.join(self.inputs.output_dir,
os.path.basename(asdm)))
def _do_importasdm(self, asdm):
inputs = self.inputs
vis = self._asdm_to_vis_filename(asdm)
outfile = os.path.join(inputs.output_dir,
os.path.basename(asdm) + '.flagonline.txt')
if inputs.save_flagonline:
# Create the standard calibration flagging template file
template_flagsfile = os.path.join(inputs.output_dir, os.path.basename(asdm) + '.flagtemplate.txt')
self._make_template_flagfile(template_flagsfile, 'User flagging commands file for the calibration pipeline')
# Create the standard Tsys calibration flagging template file.
template_flagsfile = os.path.join(inputs.output_dir, os.path.basename(asdm) + '.flagtsystemplate.txt')
self._make_template_flagfile(template_flagsfile,
'User Tsys flagging commands file for the calibration pipeline')
# Create the imaging targets file
template_flagsfile = os.path.join(inputs.output_dir, os.path.basename(asdm) + '.flagtargetstemplate.txt')
self._make_template_flagfile(template_flagsfile, 'User flagging commands file for the imaging pipeline')
createmms = mpihelpers.parse_mpi_input_parameter(inputs.createmms)
with_pointing_correction = getattr(inputs, 'with_pointing_correction', False)
task = casa_tasks.importasdm(asdm=asdm,
vis=vis,
savecmds=inputs.save_flagonline,
outfile=outfile,
process_caldevice=inputs.process_caldevice,
asis=inputs.asis,
overwrite=inputs.overwrite,
bdfflags=inputs.bdfflags,
lazy=inputs.lazy,
with_pointing_correction=with_pointing_correction,
ocorr_mode=inputs.ocorr_mode,
createmms=createmms)
self._executor.execute(task)
for xml_filename in ['Source.xml', 'SpectralWindow.xml', 'DataDescription.xml']:
asdm_source = os.path.join(asdm, xml_filename)
if os.path.exists(asdm_source):
vis_source = os.path.join(vis, xml_filename)
LOG.info('Copying %s from ASDM to measurement set', xml_filename)
LOG.trace('Copying %s: %s to %s', xml_filename, asdm_source, vis_source)
shutil.copyfile(asdm_source, vis_source)
def _make_template_flagfile(self, outfile, titlestr):
# Create a new file if overwrite is true and the file
# does not already exist.
inputs = self.inputs
if inputs.overwrite or not os.path.exists(outfile):
template_text = FLAGGING_TEMPLATE_HEADER.replace('___TITLESTR___', titlestr)
with open(outfile, 'w') as f:
f.writelines(template_text)
FLAGGING_TEMPLATE_HEADER = '''#
# ___TITLESTR___
#
# Examples
# Note: Do not put spaces inside the reason string !
#
# mode='manual' antenna='DV02;DV03&DA51' spw='22,24:150~175' reason='QA2:applycal_amplitude_frequency'
#
# mode='manual' spw='22' field='1' timerange='2018/02/10/00:01:01.0959~2018/02/10/00:01:01.0961' reason='QA2:timegaincal_phase_time'
#
# TP flagging: The 'other' option is intended for bad TP pointing
# mode='manual' antenna='PM01&&PM01' reason='QA2:other_bad_pointing'
#
# Tsys flagging:
# mode='manual' antenna='DV02;DV03&DA51' spw='22,24' reason='QA2:tsysflag_tsys_frequency'
'''