Source code for pipeline.domain.observingrun

import operator
import os

import itertools

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.utils as utils

LOG = infrastructure.get_logger(__name__)


[docs]class ObservingRun(object): def __init__(self): self.measurement_sets = [] self.virtual_science_spw_ids = {} self.virtual_science_spw_names = {} self.virtual_science_spw_shortnames = {}
[docs] def add_measurement_set(self, ms): if ms.basename in [m.basename for m in self.measurement_sets]: msg = '{0} is already in the pipeline context'.format(ms.name) LOG.error(msg) raise Exception(msg) # Initialise virtual science spw IDs from first MS if self.measurement_sets == []: self.virtual_science_spw_ids = \ dict((int(s.id), s.name) for s in ms.get_spectral_windows(science_windows_only=True)) self.virtual_science_spw_names = \ dict((s.name, int(s.id)) for s in ms.get_spectral_windows(science_windows_only=True)) self.virtual_science_spw_shortnames = {} for name in self.virtual_science_spw_names: if 'ALMA' in name: i = name.rfind('#') if i != -1: self.virtual_science_spw_shortnames[name] = name[:i] else: self.virtual_science_spw_shortnames[name] = name else: self.virtual_science_spw_shortnames[name] = name else: for s in ms.get_spectral_windows(science_windows_only=True): if s.name not in self.virtual_science_spw_names: msg = 'Science spw name {0} (ID {1}) of EB {2} does not match spw names of first EB. Virtual spw' \ ' ID mapping will not work.'.format(s.name, s.id, os.path.basename(ms.name).replace('.ms', '')) LOG.error(msg) self.measurement_sets.append(ms)
[docs] def get_ms(self, name=None, intent=None): """Returns the first measurement set matching the given identifier. Identifier precedence is name then intent. """ if name: for ms in self.measurement_sets: if name in (ms.name, ms.basename): return ms for ms in self.measurement_sets: # single dish data are registered without the MS suffix with_suffix = '%s.ms' % name if with_suffix in (ms.name, ms.basename): return ms raise KeyError('No measurement set found with name {0}'.format(name)) if intent: # Remove any extraneous characters, as intent could be specified # as *BANDPASS* for example intent = intent.replace('*', '') for ms in self.measurement_sets: for field in ms.fields: if intent in field.intents: return ms raise KeyError('No measurement set found with intent {0}'.format(intent))
[docs] def get_measurement_sets(self, names=None, intents=None, fields=None, imaging_preferred=False): """ Returns measurement sets matching the given arguments. """ candidates = self.measurement_sets # filter out MeasurementSets with no vis hits if names is not None: candidates = [ms for ms in candidates if ms.name in names] # filter out MeasurementSets with no intent hits if intents is not None: if isinstance(intents, str): intents = utils.safe_split(intents) intents = set(intents) candidates = [ms for ms in candidates if intents.issubset(ms.intents)] # filter out MeasurementSets with no field name hits if fields is not None: if isinstance(fields, str): fields = utils.safe_split(fields) fields_to_match = set(fields) candidates = [ms for ms in candidates if fields_to_match.isdisjoint({field.name for field in ms.fields})] # When requested, and if any imaging MeasurementSets have been # registered with the context, filter out the non-imaging objects if imaging_preferred: imaging_flags = [getattr(ms, 'is_imaging_ms', False) for ms in candidates] if any(imaging_flags): candidates = [ms for ms, is_imaging_ms in zip(candidates, imaging_flags) if is_imaging_ms] return candidates
[docs] def get_fields(self, names=None): """ Returns fields matching the given arguments from all measurement sets. """ match = [ms.fields for ms in self.measurement_sets] # flatten the fields lists to one sequence match = itertools.chain(*match) if names is not None: if isinstance(names, str): names = utils.safe_split(names) names = set(names) match = [f for f in match if f.name in names] return match
[docs] @staticmethod def get_real_spw_id_by_name(spw_name, target_ms): """ :param spw_name: the spw name to convert :type spw_name: string :param target_ms: the MS to map spw_name to :type target_ms: domain.MeasurementSet """ spw_id = None for spw in target_ms.spectral_windows: if spw.name == spw_name: spw_id = spw.id return spw_id
[docs] def get_virtual_spw_id_by_name(self, spw_name): """ :param spw_name: the spw name to convert :type spw_name: string """ return self.virtual_science_spw_names.get(spw_name, None)
[docs] def virtual2real_spw_id(self, spw_id, target_ms): """ :param spw_id: the spw id to convert :type spw_id: integer :param target_ms: the MS to map spw_id to :type target_ms: domain.MeasurementSet """ return self.get_real_spw_id_by_name(self.virtual_science_spw_ids.get(int(spw_id), None), target_ms)
[docs] def real2virtual_spw_id(self, spw_id, target_ms): """ :param spw_id: the spw id to convert :type spw_id: integer :param target_ms: the MS to map spw_id to :type target_ms: domain.MeasurementSet """ return self.get_virtual_spw_id_by_name(target_ms.get_spectral_window(int(spw_id)).name)
[docs] def get_real_spwsel(self, spwsel, vis): """ :param spwsel: the list of spw selections to convert :type spwsel: list of strings :param vis: the list of MS names to map spwsel to :type vis: list of MS names """ real_spwsel = [] for spwsel_item, ms_name in zip(spwsel, vis): real_spwsel_items = [] for spw_item in spwsel_item.split(','): if spw_item.find(':') == -1: real_spw_id = self.virtual2real_spw_id(int(spw_item), self.get_ms(ms_name)) real_spwsel_items.append(str(real_spw_id)) else: virtual_spw_id, selection = spw_item.split(':') real_spw_id = self.virtual2real_spw_id(int(virtual_spw_id), self.get_ms(ms_name)) real_spwsel_items.append('%s:%s' % (str(real_spw_id), selection)) real_spwsel.append(','.join(real_spwsel_items)) return real_spwsel
@property def start_time(self): if not self.measurement_sets: return None earliest, _ = min([(ms, utils.get_epoch_as_datetime(ms.start_time)) for ms in self.measurement_sets], key=operator.itemgetter(1)) return earliest.start_time @property def start_datetime(self): if not self.start_time: return None return utils.get_epoch_as_datetime(self.start_time) @property def end_time(self): if not self.measurement_sets: return None latest, _ = max([(ms, utils.get_epoch_as_datetime(ms.end_time)) for ms in self.measurement_sets], key=operator.itemgetter(1)) return latest.end_time @property def end_datetime(self): if not self.end_time: return None return utils.get_epoch_as_datetime(self.end_time) @property def project_ids(self): return {ms.project_id for ms in self.measurement_sets} @property def schedblock_ids(self): return {ms.schedblock_id for ms in self.measurement_sets} @property def execblock_ids(self): return {ms.execblock_id for ms in self.measurement_sets} @property def observers(self): return {ms.observer for ms in self.measurement_sets}