Source code for pipeline.h.tasks.importdata.fluxes

import collections
import csv
import decimal
import itertools
import operator
import os
import re
import xml.etree.ElementTree as ElementTree
from datetime import datetime
from functools import reduce

import pipeline.domain as domain
import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.utils as utils
from pipeline.infrastructure.tablereader import SpectralWindowTable
from ..common import commonfluxresults

LOG = infrastructure.get_logger(__name__)

ORIGIN_XML = 'Source.xml'
ORIGIN_ANALYSIS_UTILS = 'analysisUtils'


[docs]def get_setjy_results(mses): """ Get the flux results from the ASDM Source.xml file and populate the context. """ results = [] for ms in mses: result = commonfluxresults.FluxCalibrationResults(ms.name) science_spw_ids = [spw.id for spw in ms.get_spectral_windows()] for source, measurements in read_fluxes_nodb(ms).items(): m = [m for m in measurements if int(m.spw_id) in science_spw_ids] # import flux values for all fields and intents so that we can # compare them to the fluxscale-derived values later in the run # for field in [f for f in source.fields if 'AMPLITUDE' in f.intents]: for field in source.fields: result.measurements[field.id].extend(m) results.append(result) return results
[docs]def read_fluxes_nodb(ms): """ Read fluxes from the Source XML table translating from the ASDM to MS spw ids as we go. """ result = collections.defaultdict(list) source_table = os.path.join(ms.name, 'Source.xml') if not os.path.exists(source_table): LOG.info('No Source XML found at {}. No flux import performed. '.format(source_table)) return result source_element = ElementTree.parse(source_table) if not source_element: LOG.info('Could not parse Source XML at {}. No flux import performed.'.format(source_table)) return result # Empty spws that follow non-empty spws can be pruned from MMS data. This # set is used to check whether the Source.xml entries refer to spws # actually present in the measurement set. all_ms_spw_ids = {spw.id for spw in ms.spectral_windows} # SCIREQ-852: MS spw IDs != ASDM spw ids asdm_to_ms_spw_map = SpectralWindowTable.get_asdm_to_ms_spw_mapping(ms) for row in source_element.findall('row'): # Get the elements flux_text = row.findtext('flux') frequency_text = row.findtext('frequency') source_element = row.findtext('sourceId') spw_element = row.findtext('spectralWindowId') if spw_element is None or source_element is None: continue # spws can overlap, so rather than looking up spw by frequency, # extract the spw id from the element text. I assume the format uses # underscores, eg. 'SpectralWindow_13' _, asdm_spw_id = spw_element.split('_') # SCIREQ-852: MS spw IDs != ASDM spw ids spw_id = asdm_to_ms_spw_map.get(int(asdm_spw_id), None) if spw_id not in all_ms_spw_ids: LOG.warning('Could not map ASDM spectral window {} to MS for {}'.format(asdm_spw_id, ms.basename)) continue source_id = int(source_element) if source_id >= len(ms.sources): LOG.warning('Source.xml refers to source #{}, which was not found in {}'.format(source_id, ms.basename)) continue source = ms.sources[int(source_id)] # all elements must contain data to proceed if flux_text is None or frequency_text is None: continue # See what elements can be used try: if spw_id and frequency_text is None: spw = ms.get_spectral_windows(spw_id) frequency = str(spw[0].centre_frequency.value) except: continue # Get the measurement m = get_measurement(ms, spw_id, frequency_text, flux_text) result[source].append(m) return result
[docs]def get_measurement(ms, spw, frequency_text, flux_text): """ Construct the measurement """ # more than one measurement can be registered against the spectral # window. These functions give a lists of frequencies and IQUV # 4-tuples row_frequencies = to_hertz(frequency_text) row_iquvs = to_jansky(flux_text) spw = ms.get_spectral_window(spw) # Task: select flux measurement closest to spectral window centre # frequency, taking the mean when measurements are equally distant # first, sort the measurements by distance to spw centre # frequency, annotating each tuple with the delta frequency by_delta = sorted([(abs(spw.centre_frequency - f), f, iquv) for f, iquv in zip(row_frequencies, row_iquvs)]) # identify all measurements equally as close as this provisionally # 'closest' measurement min_delta, closest_frequency, _ = by_delta[0] joint_closest = [iquv for delta_f, _, iquv in by_delta if delta_f == min_delta] if len(joint_closest) > 1: LOG.trace('Averaging {} equally close measurements: {}'.format(len(joint_closest), joint_closest)) # calculate the mean of these equally distant measurements. # joint_closest has at least one item, so we don't need to prime # the reduce function with an empty accumulator mean_iquv = [reduce(lambda x, y: x + y, stokes) / len(joint_closest) for stokes in zip(*joint_closest)] LOG.info('Closest flux measurement for {} spw {} found {} distant from centre of spw)' ''.format(ms.basename, spw, min_delta)) # Even if a mean was calculated, any alternative selection should # be equally distant and therefore outside the sow range too if not spw.min_frequency < closest_frequency < spw.max_frequency: # This might become a warning once the PRTSPR-20823 fix is active LOG.info('Closest flux measurement for {} spw {} falls outside spw, {} distant from spectral window centre' ''.format(ms.basename, spw, min_delta)) m = domain.FluxMeasurement(spw.id, *mean_iquv, origin=ORIGIN_XML) return m
[docs]def to_jansky(flux_text): """ Convert a string extracted from an ASDM XML element to FluxDensity domain objects. """ flux_fn = lambda f: measures.FluxDensity(float(f), measures.FluxDensityUnits.JANSKY) return get_atoms(flux_text, flux_fn)
[docs]def to_hertz(freq_text): """ Convert a string extracted from an ASDM XML element to Frequency domain objects. """ freq_fn = lambda f: measures.Frequency(float(f), measures.FrequencyUnits.HERTZ) return get_atoms(freq_text, freq_fn)
[docs]def get_atoms(text, conv_fn=lambda x: x): """ Get the individual measurements from an ASDM element. This function converts a CASA record from a linear space-separated string into a multidimensional list, using the dimension headers given at the start of the CASA record to determine the number and size of each dimension. text - text from an ASDM element, with space-separated values fn - optional function converting a string to a user-defined type """ values = text.split() # syntax is <num dimensions> <size dimension 1> <size dimension 2> etc. num_dimensions = int(values[0]) dimension_sizes = list(map(int, values[1:num_dimensions + 1])) # find how may values are needed to form one complete 'entity' step_size = reduce(operator.mul, dimension_sizes) # idx holds the index of the first value for each entity idx = len(dimension_sizes) + 1 results = [] while idx < len(values): # get our complete entity as a linear list of strings, ready to be # parcelled up into dimensions data = values[idx:idx + step_size] # convert the values using the given function, eg. from string to Jy data = list(map(conv_fn, data)) # group the values into dimensions using the sizes in the header for s in dimension_sizes[-1:0:-1]: data = list(grouper(s, data)) results.extend(data) idx = idx + step_size return results
[docs]def grouper(n, iterable, fillvalue=None): """ grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx """ args = [iter(iterable)] * n return itertools.zip_longest(fillvalue=fillvalue, *args)
[docs]def CYCLE7_export_flux_from_result(results, context, filename='flux.csv'): """ Export flux densities from a set of results to a CSV file. This function was reverted because it came too late to the C6 deadline for analysisUtils to match the new format. It should be committed during C7 development. """ if not isinstance(results, list): results = [results, ] abspath = os.path.join(context.output_dir, filename) columns = ['ms', 'field', 'spw', 'I', 'Q', 'U', 'V', 'spix', 'origin', 'query_date', 'age', 'comment'] existing = [] # if the file exists, read it in if os.path.exists(abspath): with open(abspath, 'r') as f: # slurp in all but the header rows existing.extend([l for l in f.readlines() if not l.startswith(','.join(columns))]) # so we can write it back out again, with our measurements appended with open(abspath, 'wt') as f: writer = csv.writer(f) writer.writerow(columns) f.writelines(existing) counter = 0 for setjy_result in results: ms_name = setjy_result.vis ms_basename = os.path.basename(ms_name) for field_id, measurements in setjy_result.measurements.items(): for m in measurements: prefix = '%s,%s,%s' % (ms_basename, field_id, m.spw_id) for row in existing: if row.startswith(prefix): LOG.info('Not overwriting flux data for %s field %s ' 'spw %s in %s' % (ms_basename, field_id, m.spw_id, os.path.basename(abspath))) break else: (I, Q, U, V) = m.casa_flux_density ms = context.observing_run.get_ms(ms_basename) field = ms.get_fields(field_id)[0] comment = "\'" + utils.dequote(field.name) + "\'" + ' ' + 'intent=' + ','.join( sorted(field.intents)) writer.writerow((ms_basename, field_id, m.spw_id, I, Q, U, V, float(m.spix), m.origin, m.queried_at, m.age, comment)) counter += 1 LOG.info('Exported %s flux measurements to %s' % (counter, abspath))
[docs]def export_flux_from_result(results, context, filename='flux.csv'): """ Export flux densities from a set of results to a CSV file. """ if not isinstance(results, list): results = [results, ] abspath = os.path.join(context.output_dir, filename) columns = ['ms', 'field', 'spw', 'I', 'Q', 'U', 'V', 'spix', 'uvmin', 'uvmax', 'comment'] old_standard_cols = columns[:8] + columns[10:] use_old_std_cols = False existing = [] # if the file exists, read it in if os.path.exists(abspath): with open(abspath, 'r') as f: first = f.readline() if not first.startswith(','.join(columns)): # Try old format, without uvmin/uvmax (before r42290) if first.startswith(','.join(old_standard_cols)): columns = old_standard_cols use_old_std_cols = True else: raise ValueError('Cannot recognize header line in flux file: {0}'.format(first)) # slurp in all but the header rows existing.extend([l for l in f.readlines() if not l.startswith(','.join(columns))]) # so we can write it back out again, with our measurements appended comment_template = '# field={field} intents={intents} origin={origin} age={age} queried_at={queried_at}' with open(abspath, 'wt') as f: writer = csv.writer(f) writer.writerow(columns) f.writelines(existing) counter = 0 for setjy_result in results: ms_name = setjy_result.vis ms_basename = os.path.basename(ms_name) for field_id, measurements in setjy_result.measurements.items(): for m in measurements: prefix = '%s,%s,%s' % (ms_basename, field_id, m.spw_id) for row in existing: if row.startswith(prefix): LOG.info('Not overwriting flux data for {} field {} spw {}' ''.format(ms_basename, field_id, m.spw_id)) break else: (I, Q, U, V) = m.casa_flux_density ms = context.observing_run.get_ms(ms_basename) field = ms.get_fields(field_id)[0] origin = m.origin if m.origin else 'N/A' age = m.age if m.age else 'N/A' queried_at = m.queried_at if m.queried_at else 'N/A' comment = comment_template.format(field=field.clean_name, intents=','.join(sorted(field.intents)), origin=origin, age=age, queried_at=queried_at) # writer.writerow([ms_basename, field_id, m.spw_id, I, Q, U, V, float(m.spix), float(m.uvmin), float(m.uvmax), comment]) if not use_old_std_cols: out_row = [ms_basename, field_id, m.spw_id, I, Q, U, V, float(m.spix), float(m.uvmin), float(m.uvmax), comment] else: out_row = [ms_basename, field_id, m.spw_id, I, Q, U, V, float(m.spix), comment] writer.writerow(out_row) counter += 1 LOG.info('Exported %s flux measurements to %s' % (counter, abspath))
[docs]def import_flux(output_dir, observing_run, filename=None): """ Read flux densities from a CSV file and import them into the context. """ # regular expressions to match values from comment template origin_re = re.compile('(?:origin=)(?P<origin>\S+)') age_re = re.compile('(?:age=)(?P<age>\S+)') Band3age_re = re.compile('(?:Band3age=)(?P<Band3age>\S+)') query_re = re.compile('(?:queried_at=)(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \w{3})') if not filename: filename = os.path.join(output_dir, 'flux.csv') with open(filename, 'rt') as f: reader = csv.DictReader(f, restkey='others', restval=None) counter = 0 for row in reader: ms_name = row['ms'] try: ms = observing_run.get_ms(ms_name) except KeyError: # No MS registered by that name. This could be caused by a # flux.csv from a previous run LOG.info('{} refers to unregistered ASDM \'{}\'. If this is a multi-ASDM run this to be expected.' ''.format(filename, ms_name)) continue field_id = int(row['field']) spw_id = int(row['spw']) I = row['I'] Q = row['Q'] U = row['U'] V = row['V'] try: spix = decimal.Decimal(row['spix']) except (decimal.InvalidOperation, KeyError): spix = decimal.Decimal('0.0') try: uvmin = decimal.Decimal(row['uvmin']) except (decimal.InvalidOperation, KeyError): uvmin = decimal.Decimal('0.0') try: uvmax = decimal.Decimal(row['uvmax']) except (decimal.InvalidOperation, KeyError): uvmax = decimal.Decimal('0.0') comment = row['comment'] match = origin_re.search(comment) origin = match.group('origin') if match else None if origin == 'N/A': origin = None if 'Jy, freq=' in comment: origin = ORIGIN_ANALYSIS_UTILS match = age_re.search(comment) age = match.group('age') if match else None if age: try: age = float(age) except ValueError: age = None # Replace age with Band3age if age not available match = Band3age_re.search(comment) Band3age = match.group('Band3age') if match else None if age is None: if Band3age is not None: try: age = float(Band3age) LOG.info("Using Band3age value of {!s} for field {!s}, spw {!s}".format(Band3age, field_id, spw_id)) except ValueError: age = None match = query_re.search(comment) query_date = match.group('timestamp') if match else None if query_date: try: query_date = datetime.strptime(query_date, '%Y-%m-%d %H:%M:%S %Z') except TypeError: query_date = None try: fields = ms.get_fields(field_id) measurement = domain.FluxMeasurement(spw_id, I, Q, U, V, spix, uvmin, uvmax, origin=origin, age=age, queried_at=query_date) # A single field identifier could map to multiple field objects, # but the flux should be the same for all, so we iterate.. for field in fields: # .. removing any existing measurements in these spws from # these fields.. to_remove = [m for m in field.flux_densities if m.spw_id == spw_id] for flux_density in to_remove: field.flux_densities.remove(flux_density) # .. and then updating with our new values LOG.trace('Adding {} to spw {}'.format(measurement, spw_id)) field.flux_densities.add(measurement) counter += 1 except Exception as e: LOG.debug(e) LOG.warning('Problem importing \'{}\' as a flux statement'.format(row)) LOG.info('Imported {} flux measurements from {}'.format(counter, filename)) # Convert into a set of results for the web log results = [] for ms in observing_run.measurement_sets: science_spw_ids = [spw.id for spw in ms.get_spectral_windows(science_windows_only=True)] result = commonfluxresults.FluxCalibrationResults(ms.name) for field in ms.get_fields(): if field.flux_densities is None: continue for flux in field.flux_densities: if flux.spw_id not in science_spw_ids: continue # Important! The field ID is used rather than the field # name as some data describe independent fields using the # by the same name, e.g., J1733-1304 is both a science # target and phase calibrator in # uid://A002/Xc2ae09/X27f.ms. # # More info: # https://open-jira.nrao.edu/browse/CAS-10792?focusedCommentId=123387&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-123387 result.measurements[field.id].append(flux) results.append(result) return results