Source code for pipeline.infrastructure.tablereader

import collections
import datetime
import glob
import itertools
import operator
import os
import re
import xml.etree.ElementTree as ElementTree
from bisect import bisect_left
from functools import reduce

import cachetools
import numpy

import pipeline.domain as domain
import pipeline.domain.measures as measures
from . import casa_tools
from . import logging

LOG = logging.get_logger(__name__)


[docs]def find_EVLA_band(frequency, bandlimits=None, BBAND='?4PLSCXUKAQ?'): """identify VLA band""" if bandlimits is None: bandlimits = [0.0e6, 150.0e6, 700.0e6, 2.0e9, 4.0e9, 8.0e9, 12.0e9, 18.0e9, 26.5e9, 40.0e9, 56.0e9] i = bisect_left(bandlimits, frequency) return BBAND[i]
def _get_ms_name(ms): return ms.name if isinstance(ms, domain.MeasurementSet) else ms def _get_ms_basename(ms): return ms.basename if isinstance(ms, domain.MeasurementSet) else ms def _get_science_goal_value(science_goals, goal_keyword): value = None for science_goal in science_goals: keyword = science_goal.split('=')[0].replace(' ', '') if keyword != goal_keyword: continue if keyword == 'representativeSource': value = science_goal.split('=')[1].lstrip().rstrip() else: value = science_goal.split('=')[1].replace(' ', '') return value return value
[docs]class ObservingRunReader(object):
[docs] @staticmethod def get_observing_run(ms_files): if isinstance(ms_files, str): ms_files = [ms_files] observing_run = domain.ObservingRun() for ms_file in ms_files: ms = MeasurementSetReader.get_measurement_set(ms_file) observing_run.add_measurement_set(ms) return observing_run
[docs]class MeasurementSetReader(object):
[docs] @staticmethod def get_scans(msmd, ms): LOG.debug('Analysing scans in {0}'.format(ms.name)) with casa_tools.TableReader(ms.name) as openms: scan_number_col = openms.getcol('SCAN_NUMBER') time_col = openms.getcol('TIME') antenna1_col = openms.getcol('ANTENNA1') antenna2_col = openms.getcol('ANTENNA2') data_desc_id_col = openms.getcol('DATA_DESC_ID') # get columns and tools needed to create scan times time_colkeywords = openms.getcolkeywords('TIME') time_unit = time_colkeywords['QuantumUnits'][0] time_ref = time_colkeywords['MEASINFO']['Ref'] mt = casa_tools.measures qt = casa_tools.quanta scans = [] statesforscans = msmd.statesforscans() fieldsforscans = msmd.fieldsforscans(asmap=True, arrayid=0, obsid=0) spwsforscans = msmd.spwsforscans() for scan_id in msmd.scannumbers(): states = [s for s in ms.states if s.id in statesforscans[str(scan_id)]] intents = reduce(lambda s, t: s.union(t.intents), states, set()) fields = [f for f in ms.fields if f.id in fieldsforscans[str(scan_id)]] # can't use msmd.timesforscan as we need unique times grouped # by spw # scan_times = msmd.timesforscan(scan_id) exposures = {spw_id: msmd.exposuretime(scan=scan_id, spwid=spw_id) for spw_id in spwsforscans[str(scan_id)]} scan_mask = (scan_number_col == scan_id) # get the antennas used for this scan LOG.trace('Calculating antennas used for scan %s', scan_id) antenna_ids = set() scan_antenna1 = antenna1_col[scan_mask] scan_antenna2 = antenna2_col[scan_mask] antenna_ids.update(scan_antenna1) antenna_ids.update(scan_antenna2) antennas = [o for o in ms.antennas if o.id in antenna_ids] # get the data descriptions for this scan LOG.trace('Calculating data descriptions used for scan %s', scan_id) scan_data_desc_id = set(data_desc_id_col[scan_mask]) data_descriptions = [o for o in ms.data_descriptions if o.id in scan_data_desc_id] # times are specified per data description, so we must # re-mask and calculate times per dd scan_times = {} LOG.trace('Processing scan times for scan %s', scan_id) for dd in data_descriptions: dd_mask = (scan_number_col == scan_id) & (data_desc_id_col == dd.id) raw_midpoints = list(time_col[dd_mask]) unique_midpoints = set(raw_midpoints) epoch_midpoints = [mt.epoch(time_ref, qt.quantity(o, time_unit)) for o in unique_midpoints] scan_times[dd.spw.id] = list(zip(epoch_midpoints, itertools.repeat(exposures[dd.spw.id]))) LOG.trace('Creating domain object for scan %s', scan_id) scan = domain.Scan(id=scan_id, states=states, fields=fields, data_descriptions=data_descriptions, antennas=antennas, scan_times=scan_times, intents=intents) scans.append(scan) LOG.trace('{0}'.format(scan)) return scans
[docs] @staticmethod def add_band_to_spws(ms): for spw in ms.spectral_windows: if spw.type == 'WVR': spw.band = 'WVR' continue # Expected format is something like ALMA_RB_03#BB_1#SW-01#FULL_RES m = re.search(r'ALMA_RB_(?P<band>\d+)', spw.name) if m: band_str = m.groupdict()['band'] band_num = int(band_str) spw.band = 'ALMA Band %s' % band_num continue spw.band = BandDescriber.get_description(spw.ref_frequency, observatory=ms.antenna_array.name) # Used EVLA band name from spw instead of frequency range observatory = ms.antenna_array.name.upper() if observatory in ('VLA', 'EVLA'): spw2band = ms.get_vla_spw2band() try: EVLA_band = spw2band[spw.id] except: LOG.info('Unable to get band from spw id - using reference frequency instead') freqHz = float(spw.ref_frequency.value) EVLA_band = find_EVLA_band(freqHz) EVLA_band_dict = {'4': '4m (4)', 'P': '90cm (P)', 'L': '20cm (L)', 'S': '13cm (S)', 'C': '6cm (C)', 'X': '3cm (X)', 'U': '2cm (Ku)', 'K': '1.3cm (K)', 'A': '1cm (Ka)', 'Q': '0.7cm (Q)'} spw.band = EVLA_band_dict[EVLA_band]
[docs] @staticmethod def get_measurement_set(ms_file): LOG.info('Analysing {0}'.format(ms_file)) ms = domain.MeasurementSet(ms_file) # populate ms properties with results of table readers with casa_tools.MSMDReader(ms_file) as msmd: LOG.info('Populating ms.antenna_array...') ms.antenna_array = AntennaTable.get_antenna_array(msmd) LOG.info('Populating ms.spectral_windows...') ms.spectral_windows = RetrieveByIndexContainer(SpectralWindowTable.get_spectral_windows(msmd, ms)) LOG.info('Populating ms.states...') ms.states = RetrieveByIndexContainer(StateTable.get_states(msmd)) LOG.info('Populating ms.fields...') ms.fields = RetrieveByIndexContainer(FieldTable.get_fields(msmd)) LOG.info('Populating ms.sources...') ms.sources = RetrieveByIndexContainer(SourceTable.get_sources(msmd)) LOG.info('Populating ms.data_descriptions...') ms.data_descriptions = RetrieveByIndexContainer(DataDescriptionTable.get_descriptions(msmd, ms)) LOG.info('Populating ms.polarizations...') ms.polarizations = PolarizationTable.get_polarizations(msmd) # For now the SBSummary table is ALMA specific if 'ALMA' in msmd.observatorynames(): sbinfo = SBSummaryTable.get_sbsummary_info(ms, msmd.observatorynames()) if sbinfo.repSource is None: LOG.warn('Unable to identify representative target for %s. Will try to fall back to existing' ' science target sources in the imaging tasks.' % ms.basename) else: if sbinfo.repSource == 'none': LOG.warn('Representative target for %s is set to "none". Will try to fall back to existing' ' science target sources or calibrators in the imaging tasks.' % ms.basename) LOG.info('Populating ms.representative_target ...') ms.representative_target = (sbinfo.repSource, sbinfo.repFrequency, sbinfo.repBandwidth) ms.representative_window = sbinfo.repWindow LOG.info('Populating ms.science_goals ...') if sbinfo.minAngResolution is None and sbinfo.maxAngResolution is None: observing_mode = SBSummaryTable.get_observing_mode(ms) # Only warn if the number of 12m antennas is greater than the number of 7m antennas # and if the observation is not single dish if len([a for a in ms.get_antenna() if a.diameter == 12.0]) > \ len([a for a in ms.get_antenna() if a.diameter == 7.0]) \ and 'Standard Single Dish' not in observing_mode: LOG.warn('Undefined angular resolution limits for %s' % ms.basename) ms.science_goals = {'minAcceptableAngResolution': '0.0arcsec', 'maxAcceptableAngResolution': '0.0arcsec'} else: # LOG.info('Populating ms.science_goals ...') ms.science_goals = {'minAcceptableAngResolution': sbinfo.minAngResolution, 'maxAcceptableAngResolution': sbinfo.maxAngResolution} if sbinfo.maxAllowedBeamAxialRatio is None: ms.science_goals['maxAllowedBeamAxialRatio'] = '0.0' else: ms.science_goals['maxAllowedBeamAxialRatio'] = sbinfo.maxAllowedBeamAxialRatio if sbinfo.sensitivity is None: ms.science_goals['sensitivity'] = '0.0mJy' else: ms.science_goals['sensitivity'] = sbinfo.sensitivity if sbinfo.dynamicRange is None: ms.science_goals['dynamicRange'] = '1.0' else: ms.science_goals['dynamicRange'] = sbinfo.dynamicRange ms.science_goals['sbName'] = sbinfo.sbName LOG.info('Populating ms.array_name ...') # No MSMD functions to help populating the ASDM_EXECBLOCK table ms.array_name = ExecblockTable.get_execblock_info(ms) with casa_tools.MSReader(ms.name) as openms: for dd in ms.data_descriptions: openms.selectinit(reset=True) # CAS-11207: from ~CASA 5.3pre89 onwards, getdata fails if # the data selection does not select any datadd is # missing. To compensate for this, selectinit now returns # a boolean that indicates the status of data selection # (True = selection contains data); this can be used to # check that the subsequent getdata call will succeed. if openms.selectinit(datadescid=dd.id): ms_info = openms.getdata(['axis_info', 'time']) dd.obs_time = numpy.mean(ms_info['time']) dd.chan_freq = ms_info['axis_info']['freq_axis']['chan_freq'].tolist() dd.corr_axis = ms_info['axis_info']['corr_axis'].tolist() # now back to pure MSMD calls LOG.info('Linking fields to states...') MeasurementSetReader.link_fields_to_states(msmd, ms) LOG.info('Linking fields to sources...') MeasurementSetReader.link_fields_to_sources(msmd, ms) LOG.info('Linking intents to spws...') MeasurementSetReader.link_intents_to_spws(msmd, ms) LOG.info('Linking spectral windows to fields...') MeasurementSetReader.link_spws_to_fields(msmd, ms) LOG.info('Populating ms.scans...') ms.scans = MeasurementSetReader.get_scans(msmd, ms) (observer, project_id, schedblock_id, execblock_id) = ObservationTable.get_project_info(msmd) MeasurementSetReader.add_band_to_spws(ms) # work around NumPy bug with empty strings # http://projects.scipy.org/numpy/ticket/1239 ms.observer = str(observer) ms.project_id = str(project_id) ms.schedblock_id = schedblock_id ms.execblock_id = execblock_id return ms
@staticmethod def _get_range(filename, column): with casa_tools.MSReader(filename) as ms: data = ms.range([column]) return list(data.values())[0]
[docs]class SpectralWindowTable(object):
[docs] @staticmethod def get_spectral_windows(msmd, ms): # map spw ID to spw type spw_types = {i: 'FDM' for i in msmd.fdmspws()} spw_types.update({i: 'TDM' for i in msmd.tdmspws()}) spw_types.update({i: 'WVR' for i in msmd.wvrspws()}) spw_types.update({i: 'CHANAVG' for i in msmd.chanavgspws()}) spw_types.update({i: 'SQLD' for i in msmd.almaspws(sqld=True)}) # these msmd functions don't need a spw argument. They return a list of # values, one for each spw spw_names = msmd.namesforspws() bandwidths = msmd.bandwidths() # We need the first TARGET source ID to get the correct transitions try: first_target_field_id = msmd.fieldsforintent('*TARGET*')[0] first_target_source_id = msmd.sourceidforfield(first_target_field_id) except: first_target_source_id = 0 target_spw_ids = msmd.spwsforintent('*TARGET*') # Read in information on receiver for current MS. receiver_info = SpectralWindowTable.get_receiver_info(ms) spws = [] for i, spw_name in enumerate(spw_names): # get this spw's values from our precalculated lists and dicts bandwidth = bandwidths[i] spw_type = spw_types.get(i, 'UNKNOWN') # the following msmd functions need a spw argument, so they have # to be contained within the spw loop mean_freq = msmd.meanfreq(i) chan_freqs = msmd.chanfreqs(i) chan_widths = msmd.chanwidths(i) chan_effective_bws = msmd.chaneffbws(i) sideband = msmd.sideband(i) # BBC_NO column is optional if 'NRO' in msmd.observatorynames(): # For Nobeyama (TODO: how to define BBC_NO for NRO) baseband = i else: baseband = msmd.baseband(i) ref_freq = msmd.reffreq(i) # Read transitions for target spws. Other spws may cause severe # messages because the target source IDs may not have the spw. if i in target_spw_ids: try: # TRANSITIONS column does not exist in old data # TODO: Are the transitions of a given spw the same for all # target source IDs ? transitions = msmd.transitions(sourceid=first_target_source_id, spw=i) if transitions is False: transitions = ['Unknown'] except: transitions = ['Unknown'] else: transitions = ['Unknown'] if spw_name in [None, '']: spw_name = 'spw_%s' % str(i) # Extract receiver type and LO frequencies for current spw. try: receiver, freq_lo = receiver_info[i] except KeyError: LOG.info("No receiver info available for MS {} spw id {}".format(_get_ms_basename(ms), i)) receiver, freq_lo = None, None # Store all info in a new SpectralWindow object. spw = domain.SpectralWindow(i, spw_name, spw_type, bandwidth, ref_freq, mean_freq, chan_freqs, chan_widths, chan_effective_bws, sideband, baseband, receiver, freq_lo, transitions=transitions) spws.append(spw) return spws
[docs] @staticmethod def get_receiver_info(ms): """ Extract information about the receiver from the ASDM_RECEIVER table. The following properties are extracted: * receiver type (e.g.: TSB, DSB, NOSB) * local oscillator frequencies If multiple entries are present for the same ASDM spwid, then keep :param ms: measurement set to inspect :return: dict of MS spw: (receiver_type, freq_lo) """ # Get mapping of ASDM spectral window id to MS spectral window id. asdm_to_ms_spw_map = SpectralWindowTable.get_asdm_to_ms_spw_mapping(ms) # Construct path to ASDM_RECEIVER table. msname = _get_ms_name(ms) receiver_table = os.path.join(msname, 'ASDM_RECEIVER') receiver_info = {} try: # Read in required columns from table. with casa_tools.TableReader(receiver_table) as tb: # Extract the ASDM spw ids column. spwids = tb.getcol('spectralWindowId') # Go through the table row-by-row, and extract info for each # ASDM spwid encountered: for i, spwid in enumerate(spwids): # Assume that ASDM spectral windows are stored as a string # such as "SpectralWindow_<nn>", where <nn> is the ID integer. _, asdm_spwid = spwid.split('_') # Get MS spwid corresponding to the current ASDM spwid. ms_spwid = asdm_to_ms_spw_map[int(asdm_spwid)] # Add the information from the current row if either: # a.) no info for the current spwid was stored yet. # b.) info was already stored for the current spwid, but # this info was not for receiver type of "TSB" or "DSB". # This will store one entry for each ASDM spwid encountered, # preferentially the first TSB/DSB row in the table # corresponding to the spwid, but otherwise the first # non-TSB/DSB row corresponding to the spwid. if ms_spwid not in receiver_info or receiver_info[ms_spwid][0] not in ["TSB", "DSB"]: receiver_info[ms_spwid] = (tb.getcell("receiverSideband", i), tb.getcell("freqLO", i)) except: LOG.info("Unable to read receiver info for MS {}".format(_get_ms_basename(ms))) receiver_info = {} return receiver_info
[docs] @staticmethod def parse_spectral_window_ids_from_xml(xml_path): """ Extract the spectral window ID element from each row of an XML file. :param xml_path: path for XML file :return: list of integer spectral window IDs """ ids = [] try: root_element = ElementTree.parse(xml_path) for row in root_element.findall('row'): element = row.findtext('spectralWindowId') _, str_id = element.split('_') ids.append(int(str_id)) except IOError: LOG.info("Could not parse XML at: {}".format(xml_path)) return ids
[docs] @staticmethod def get_data_description_spw_ids(ms): """ Extract a list of spectral window IDs from the DataDescription XML for an ASDM. This function assumes the XML has been copied across to the measurement set directory. :param ms: measurement set to inspect :return: list of integers corresponding to ASDM spectral window IDs """ result = [] xml_path = os.path.join(ms.name, 'DataDescription.xml') if not os.path.exists(xml_path): LOG.info("No DataDescription XML found at {}.".format(xml_path)) else: result = SpectralWindowTable.parse_spectral_window_ids_from_xml(xml_path) return result
[docs] @staticmethod def get_spectral_window_spw_ids(ms): """ Extract a list of spectral window IDs from the SpectralWindow XML for an ASDM. This function assumes the XML has been copied across to the measurement set directory. :param ms: measurement set to inspect :return: list of integers corresponding to ASDM spectral window IDs """ result = [] xml_path = os.path.join(ms.name, 'SpectralWindow.xml') if not os.path.exists(xml_path): LOG.info("No SpectralWindow XML found at {}.".format(xml_path)) else: result = SpectralWindowTable.parse_spectral_window_ids_from_xml(xml_path) return result
[docs] @staticmethod def get_asdm_to_ms_spw_mapping(ms): """ Get the mapping of ASDM spectral window ID to Measurement Set spectral window ID. This function requires the SpectralWindow and DataDescription ASDM XML files to have been copied across to the measurement set directory. :param ms: measurement set to inspect :return: dict of ASDM spw: MS spw """ dd_spws = SpectralWindowTable.get_data_description_spw_ids(ms) spw_spws = SpectralWindowTable.get_spectral_window_spw_ids(ms) asdm_ids = [i for i in spw_spws if i in dd_spws] + [i for i in spw_spws if i not in dd_spws] return {k: v for k, v in zip(asdm_ids, spw_spws)}
[docs]class ObservationTable(object):
[docs] @staticmethod def get_project_info(msmd): project_id = msmd.projects()[0] observer = msmd.observers()[0] schedblock_id = 'N/A' execblock_id = 'N/A' obsnames = msmd.observatorynames() if 'ALMA' in obsnames or 'VLA' in obsnames or 'EVLA' in obsnames: # TODO this would break if > 1 observation in an EB. Can that # ever happen? d = {} for cell in msmd.schedule(0): key, val = cell.split() d[key] = val schedblock_id = d.get('SchedulingBlock', 'N/A') execblock_id = d.get('ExecBlock', 'N/A') return observer, project_id, schedblock_id, execblock_id
[docs]class AntennaTable(object):
[docs] @staticmethod def get_antenna_array(msmd): position = msmd.observatoryposition() names = set(msmd.observatorynames()) assert len(names) is 1 name = names.pop() array = domain.AntennaArray(name, position) # .. and add a new Antenna for each row in the ANTENNA table for antenna in AntennaTable.get_antennas(msmd): array.add_antenna(antenna) return array
[docs] @staticmethod def get_antennas(msmd): antenna_table = os.path.join(msmd.name(), 'ANTENNA') LOG.trace('Opening ANTENNA table to read ANTENNA.FLAG_ROW') with casa_tools.TableReader(antenna_table) as table: flags = table.getcol('FLAG_ROW') antennas = [] for (i, name, station) in zip(msmd.antennaids(), msmd.antennanames(), msmd.antennastations()): # omit this antenna if it has been flagged if flags[i]: continue position = msmd.antennaposition(i) offset = msmd.antennaoffset(i) diameter_m = casa_tools.quanta.convert(msmd.antennadiameter(i), 'm') diameter = casa_tools.quanta.getvalue(diameter_m)[0] antenna = domain.Antenna(i, name, station, position, offset, diameter) antennas.append(antenna) return antennas
@staticmethod def _create_antenna(antenna_id, name, station, diameter, position, offset, flag): # omit this antenna if it has been flagged if flag is True: return return domain.Antenna(antenna_id, name, station, position, offset, diameter)
[docs]class DataDescriptionTable(object):
[docs] @staticmethod def get_descriptions(msmd, ms): spws = ms.spectral_windows # read the data descriptions table and create the objects descriptions = [DataDescriptionTable._create_data_description(spws, *row) for row in DataDescriptionTable._read_table(msmd)] return descriptions
@staticmethod def _create_data_description(spws, dd_id, spw_id, pol_id): # find the SpectralWindow matching the given spectral window ID matching_spws = [spw for spw in spws if spw.id == spw_id] spw = matching_spws[0] return domain.DataDescription(dd_id, spw, pol_id) @staticmethod def _read_table(msmd): """ Read the DATA_DESCRIPTION table of the given measurement set. """ LOG.debug('Analysing DATA_DESCRIPTION table') dd_ids = msmd.datadescids() spw_ids = msmd.spwfordatadesc() pol_ids = msmd.polidfordatadesc() return list(zip(dd_ids, spw_ids, pol_ids))
SBSummaryInfo = collections.namedtuple( 'SBSummaryInfo', 'repSource repFrequency repBandwidth repWindow minAngResolution maxAngResolution ' 'maxAllowedBeamAxialRatio sensitivity dynamicRange sbName')
[docs]class SBSummaryTable(object):
[docs] @staticmethod def get_sbsummary_info(ms, obsnames): try: sbsummary_info = [SBSummaryTable._create_sbsummary_info(*row) for row in SBSummaryTable._read_table(ms)] return sbsummary_info[0] except: if 'ALMA' in obsnames: LOG.warn('Error reading science goals for %s' % ms.basename) return SBSummaryInfo(repSource=None, repFrequency=None, repBandwidth=None, repWindow=None, minAngResolution=None, maxAngResolution=None, maxAllowedBeamAxialRatio=None, sensitivity=None, dynamicRange=None, sbName=None)
[docs] @staticmethod def get_observing_mode(ms): msname = _get_ms_name(ms) sbsummary_table = os.path.join(msname, 'ASDM_SBSUMMARY') observing_modes = [] try: with casa_tools.TableReader(sbsummary_table) as tb: observing_mode = tb.getcol('observingMode') for irow in range(tb.nrows()): cell = observing_mode[:, irow] for mode in cell: if mode not in observing_modes: observing_modes.append(mode) except: LOG.warn('Error reading observing modes for %s' % ms.basename) return observing_modes
@staticmethod def _create_sbsummary_info(repSource, repFrequency, repBandwidth, repWindow, minAngResolution, maxAngResolution, maxAllowedBeamAxialRatio, sensitivity, dynamicRange, sbName): return SBSummaryInfo(repSource=repSource, repFrequency=repFrequency, repBandwidth=repBandwidth, repWindow=repWindow, minAngResolution=minAngResolution, maxAngResolution=maxAngResolution, maxAllowedBeamAxialRatio=maxAllowedBeamAxialRatio, sensitivity=sensitivity, dynamicRange=dynamicRange, sbName=sbName) @staticmethod def _read_table(ms): """ Read the ASDM_SBSummary table For all practical purposes this table consists of a single row but handle the more general case """ LOG.debug('Analysing ASDM_SBSummary table') qa = casa_tools.quanta msname = _get_ms_name(ms) sbsummary_table = os.path.join(msname, 'ASDM_SBSUMMARY') with casa_tools.TableReader(sbsummary_table) as table: try: scienceGoals = table.getcol('scienceGoal') numScienceGoals = table.getcol('numScienceGoal') except: # LOG.warn('Error reading science goals for %s' % (ms.basename)) raise repSources = [] repFrequencies = [] repBandWidths = [] repWindows = [] minAngResolutions = [] maxAngResolutions = [] maxAllowedBeamAxialRatios = [] sensitivities = [] dynamicRanges = [] sbNames = [] for i in range(table.nrows()): # Create source repSource = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'representativeSource') repSources.append(repSource) # Create frequency repFrequencyGoal = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'representativeFrequency') if repFrequencyGoal is not None: repFrequency = qa.quantity(repFrequencyGoal) else: repFrequency = qa.quantity(0.0) if repFrequency['value'] <= 0.0 or repFrequency['unit'] == '': repFrequency = None repFrequencies.append(repFrequency) # Create representative bandwidth repBandWidthGoal = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'representativeBandwidth') if repBandWidthGoal is not None: repBandWidth = qa.quantity(repBandWidthGoal) else: repBandWidth = qa.quantity(0.0) if repBandWidth['value'] <= 0.0 or repBandWidth['unit'] == '': repBandWidth = None repBandWidths.append(repBandWidth) # Create window repWindow = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'representativeWindow') if repWindow in ('none', ''): repWindow = None repWindows.append(repWindow) # Create minimum and maximum angular resolution minAngResolutionGoal = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'minAcceptableAngResolution') maxAngResolutionGoal = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'maxAcceptableAngResolution') if minAngResolutionGoal is not None: minAngResolution = qa.quantity(minAngResolutionGoal) else: minAngResolution = qa.quantity(0.0) if maxAngResolutionGoal is not None: maxAngResolution = qa.quantity(maxAngResolutionGoal) else: maxAngResolution = qa.quantity(0.0) # There are cases with minAngResolutionGoal being set to 0 arcsec # while maxAngResolutionGoal has a non-zero value (PIPE-593). if (minAngResolution['value'] <= 0.0 and maxAngResolution['value'] <= 0.0) or minAngResolution['unit'] == '': minAngResolution = None if maxAngResolution['value'] <= 0.0 or maxAngResolution['unit'] == '': maxAngResolution = None minAngResolutions.append(minAngResolution) maxAngResolutions.append(maxAngResolution) # Create maximum allowed beam axial ratio maxAllowedBeamAxialRatioGoal = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'maxAllowedBeamAxialRatio') if maxAllowedBeamAxialRatioGoal is not None: maxAllowedBeamAxialRatio = qa.quantity(maxAllowedBeamAxialRatioGoal) else: maxAllowedBeamAxialRatio = qa.quantity(0.0) if maxAllowedBeamAxialRatio['value'] <= 0.0 or maxAllowedBeamAxialRatio['value'] >= 999.: maxAllowedBeamAxialRatio = None maxAllowedBeamAxialRatios.append(maxAllowedBeamAxialRatio) # Create sensitivity goal sensitivityGoal = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'sensitivityGoal') if sensitivityGoal is not None: sensitivity = qa.quantity(sensitivityGoal) else: sensitivity = qa.quantity(0.0) if sensitivity['value'] <= 0.0 or sensitivity['unit'] == '': sensitivity = None sensitivities.append(sensitivity) # Create dynamic range goal dynamicRangeGoal = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'dynamicRange') if dynamicRangeGoal is not None: dynamicRange = qa.quantity(dynamicRangeGoal) else: dynamicRange = qa.quantity(0.0) dynamicRanges.append(dynamicRange) sbName = _get_science_goal_value(scienceGoals[0:numScienceGoals[i], i], 'SBName') sbNames.append(sbName) rows = list(zip(repSources, repFrequencies, repBandWidths, repWindows, minAngResolutions, maxAngResolutions, maxAllowedBeamAxialRatios, sensitivities, dynamicRanges, sbNames)) return rows
[docs]class ExecblockTable(object):
[docs] @staticmethod def get_execblock_info(ms): try: execblock_info = [ExecblockTable._create_execblock_info(*row) for row in ExecblockTable._read_table(ms)] if execblock_info[0][0] == 'ALMA': if execblock_info[0][1] == 'A': return None else: return execblock_info[0][1] else: return execblock_info[0][1] except: return None
@staticmethod def _create_execblock_info(telescopeName, configName): return telescopeName, configName @staticmethod def _read_table(ms): """ Read the ASDM_EXECBLOCK table For all practical purposes this table consists of a single row but handle the more general case """ LOG.debug('Analysing ASDM_EXECBLOCK table') msname = _get_ms_name(ms) execblock_table = os.path.join(msname, 'ASDM_EXECBLOCK') with casa_tools.TableReader(execblock_table) as table: telescope_names = table.getcol('telescopeName') config_names = table.getcol('configName') # In case multiple columns are extracted at some point # in which case rows would be constructed from the zipped # columns rows = list(zip(telescope_names, config_names)) return rows
[docs]class PolarizationTable(object):
[docs] @staticmethod def get_polarizations(msmd): pol_ids = sorted({int(i) for i in msmd.polidfordatadesc()}) num_corrs = [msmd.ncorrforpol(i) for i in pol_ids] corr_types = [msmd.corrtypesforpol(i) for i in pol_ids] corr_products = [msmd.corrprodsforpol(i) for i in pol_ids] return [PolarizationTable._create_pol_description(*row) for row in zip(pol_ids, num_corrs, corr_types, corr_products)]
@staticmethod def _create_pol_description(id, num_corr, corr_type, corr_product): return domain.Polarization(id, num_corr, corr_type, corr_product)
[docs]class SourceTable(object):
[docs] @staticmethod def get_sources(msmd): rows = SourceTable._read_table(msmd) # duplicate source entries may be present due to duplicate entries # differing by non-essential columns, such as spw key_fn = operator.itemgetter(0) data = sorted(rows, key=key_fn) grouped_by_source_id = [] for _, g in itertools.groupby(data, key_fn): grouped_by_source_id.append(list(g)) no_dups = [s[0] for s in grouped_by_source_id] return [SourceTable._create_source(*row) for row in no_dups]
@staticmethod def _create_source(source_id, name, direction, proper_motion, is_eph_obj): return domain.Source(source_id, name, direction, proper_motion, is_eph_obj) @staticmethod def _read_table(msmd): """ Read the SOURCE table of the given measurement set. """ LOG.debug('Analysing SOURCE table') ids = msmd.sourceidsfromsourcetable() sourcenames = msmd.sourcenames() directions = [v for _, v in sorted(msmd.sourcedirs().items(), key=lambda d: int(d[0]))] propermotions = [v for _, v in sorted(msmd.propermotions().items(), key=lambda pm: int(pm[0]))] eph_sourcenames = SourceTable._get_eph_sourcenames(msmd.name()) is_eph_objs = [sourcename in eph_sourcenames for sourcename in sourcenames] all_sources = list(zip(ids, sourcenames, directions, propermotions, is_eph_objs)) # Only return sources for which scans are present. # Create a mapping of source id to a boolean of whether any # scans are present for that source, using fields to link # between sources and scans. source_id_to_scans = {} for source_id in set(msmd.sourceidsfromsourcetable()): fields_for_source = set(msmd.fieldsforsource(source_id)) # Fields do not necessarily have associated scans. If only the # first field is tested and gives a negative result, CAS-9499 # results (AttributeError: 'Field' object has no attribute # 'source'). Prevent this by testing all fields for the # presence of scans. source_id_to_scans[source_id] = any([len(msmd.scansforfield(field_id)) is not 0 for field_id in fields_for_source]) return [row for row in all_sources if source_id_to_scans.get(row[0], False)] @staticmethod def _get_eph_sourcenames(msname): ephemeris_tables = glob.glob(msname+'/FIELD/EPHEM*.tab') eph_sourcenames = [] for ephemeris_table in ephemeris_tables: with casa_tools.TableReader(ephemeris_table) as tb: keywords = tb.getkeywords() eph_sourcenames.append(keywords['NAME']) return eph_sourcenames
[docs]class StateTable(object):
[docs] @staticmethod def get_states(msmd): state_factory = StateTable.get_state_factory(msmd) LOG.trace('Opening STATE table to read STATE.OBS_MODE') state_table = os.path.join(msmd.name(), 'STATE') with casa_tools.TableReader(state_table) as table: obs_modes = table.getcol('OBS_MODE') states = [] for i in range(msmd.nstates()): obs_mode = obs_modes[i] state = state_factory.create_state(i, obs_mode) states.append(state) return states
[docs] @staticmethod def get_state_factory(msmd): names = set(msmd.observatorynames()) assert len(names) is 1 facility = names.pop() first_scan = min(msmd.scannumbers()) scan_start = min(msmd.timesforscan(first_scan)) LOG.trace('Opening MS to read TIME keyword to avoid ' 'msmd.timesforscan() units ambiguity') with casa_tools.TableReader(msmd.name()) as table: time_colkeywords = table.getcolkeywords('TIME') time_unit = time_colkeywords['QuantumUnits'][0] time_ref = time_colkeywords['MEASINFO']['Ref'] me = casa_tools.measures qa = casa_tools.quanta epoch_start = me.epoch(time_ref, qa.quantity(scan_start, time_unit)) str_start = qa.time(epoch_start['m0'], form=['fits'])[0] dt_start = datetime.datetime.strptime(str_start, '%Y-%m-%dT%H:%M:%S') return domain.state.StateFactory(facility, dt_start)
[docs]class FieldTable(object): @staticmethod def _read_table(msmd): num_fields = msmd.nfields() field_ids = list(range(num_fields)) field_names = msmd.namesforfields() times = [msmd.timesforfield(i) for i in field_ids] phase_centres = [msmd.phasecenter(i) for i in field_ids] source_ids = [msmd.sourceidforfield(i) for i in field_ids] LOG.trace('Opening FIELD table to read FIELD.SOURCE_TYPE') field_table = os.path.join(msmd.name(), 'FIELD') with casa_tools.TableReader(field_table) as table: # TODO can this old code be removed? We've not handled non-APDMs # for a *long* time! # # FIELD.SOURCE_TYPE contains the intents in non-APDM MS if 'SOURCE_TYPE' in table.colnames(): source_types = table.getcol('SOURCE_TYPE') else: source_types = [None] * num_fields all_fields = list(zip(field_ids, field_names, source_ids, times, source_types, phase_centres)) # only return sources for which scans are present # create a mapping of source id to a boolean of whether any scans are present for that source field_id_to_scans = {field_id: (len(msmd.scansforfield(field_id)) is not 0) for field_id in set(field_ids)} return [row for row in all_fields if field_id_to_scans.get(row[0], False)]
[docs] @staticmethod def get_fields(msmd): return [FieldTable._create_field(*row) for row in FieldTable._read_table(msmd)]
@staticmethod def _create_field(field_id, name, source_id, time, source_type, phase_centre): field = domain.Field(field_id, name, source_id, time, phase_centre) if source_type: field.set_source_type(source_type) return field
def _make_range(f_min, f_max): return measures.FrequencyRange(measures.Frequency(f_min), measures.Frequency(f_max))
[docs]class BandDescriber(object): alma_bands = {'ALMA Band 1': _make_range(31.3, 45), 'ALMA Band 2': _make_range(67, 90), 'ALMA Band 3': _make_range(84, 116), 'ALMA Band 4': _make_range(125, 163), 'ALMA Band 5': _make_range(163, 211), 'ALMA Band 6': _make_range(211, 275), 'ALMA Band 7': _make_range(275, 373), 'ALMA Band 8': _make_range(385, 500), 'ALMA Band 9': _make_range(602, 720), 'ALMA Band 10': _make_range(787, 950)} # From original EVLA pipeline script # FLOW = [ 0.0e6, 150.0e6, 700.0e6, 2.0e9, 4.0e9, 8.0e9, 12.0e9, 18.0e9, 26.5e9, 40.0e9 ] # FHIGH = [ 150.0e6, 700.0e6, 2.0e9, 4.0e9, 8.0e9, 12.0e9, 18.0e9, 26.5e9, 40.0e9, 56.0e9 ] # BBAND = [ '4', 'P', 'L', 'S', 'C', 'X', 'U', 'K', 'A', 'Q' ] evla_bands = {'20cm (L)': _make_range(0.7, 2.0), '13cm (S)': _make_range(2.0, 4.0), '6cm (C)': _make_range(4, 8), '3cm (X)': _make_range(8, 12), '2cm (Ku)': _make_range(12, 18), '1.3cm (K)': _make_range(18, 26.5), '1cm (Ka)': _make_range(26.5, 40), '0.7cm (Q)': _make_range(40, 56.0)} unknown = {'Unknown': measures.FrequencyRange()}
[docs] @staticmethod def get_description(f, observatory='ALMA'): if observatory.upper() in ('ALMA',): bands = BandDescriber.alma_bands elif observatory.upper() in ('VLA', 'EVLA'): bands = BandDescriber.evla_bands else: bands = BandDescriber.unknown for description, rng in bands.items(): if rng.contains(f): return description return 'Unknown'
[docs]class RetrieveByIndexContainer: """ RetrieveByIndexContainer is a container for items whose numeric index or other unique identifier is stored in an instance attribute. Retrieving by index from this container matches and returns the item with matching index attribute, which may differ from the natural position of the item in the underlying list backing store. For instance, getting item 3 with container[3] returns the item with index attribute == 3, not the item at position 3. """ def __init__(self, items, index_fn=operator.attrgetter('id')): """ Create a new RetrieveByIndexContainer. The list of items passed as the 'items' argument is set as an instance attribute (i.e., a copy or deep copy is not made). No changes should be made to the list after passing it to this constructor. :param items: the list of indexable items to wrap :param index_fn: function that returns the index of an item instance """ self.__items = items self.__index_fn = index_fn def __iter__(self): return iter(self.__items) def __len__(self): return len(self.__items) def __getitem__(self, index): try: index = int(index) except ValueError: raise TypeError( 'list indices must be integers, not {}'.format(index.__class__.__name__)) with_id = [i for i in self.__items if self.__index_fn(i) == index] if not with_id: raise IndexError('list index out of range: {}'.format(index)) if len(with_id) > 1: raise IndexError('more than one object found with ID {}'.format(index)) return with_id.pop() def __str__(self): return '<RetrieveByIndexContainer({})>'.format(str(self.__items))