Source code for pipeline.hifv.tasks.exportdata.vlaifaqua

import operator
import os
import xml.etree.cElementTree as ElementTree

import pipeline.domain.measures as measures
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.launcher as launcher

import pipeline.h.tasks.exportdata.aqua as aqua
from pipeline.h.tasks.exportdata.aqua import UNDEFINED, export_to_disk

LOG = logging.get_logger(__name__)


[docs]def aqua_report_from_file(context_file, aqua_file): """ Create AQUA report from a context file on disk. """ # Restore context from file LOG.info('Opening context file: {!s}'.format(context_file)) context = launcher.Pipeline(context=context_file).context # Produce the AQUA report aqua_report_from_context(context, aqua_file)
[docs]def aqua_test_report_from_local_file(context_file, aqua_file): """ Test AQUA report generation. The pipeline context file and web log directory must be in the same local directry """ LOG.info('Opening context file: {!s} for test'.format(context_file)) context = launcher.Pipeline(context=context_file, path_overrides={'name': os.path.splitext(context_file)[0], 'output_dir': os.getcwd()}).context # Produce the AQUA report aqua_report_from_context(context, aqua_file)
[docs]def aqua_report_from_context(context, aqua_file): """ Create AQUA report from a context object. """ LOG.info('Recipe name: %s' % 'Unknown') LOG.info(' Number of stages: %d' % context.task_counter) # Initialize generator = VLAAquaXmlGenerator() report = generator.get_report_xml(context) LOG.info('Writing aqua report file: %s' % aqua_file) export_to_disk(report, aqua_file)
[docs]class VLAAquaXmlGenerator(aqua.AquaXmlGenerator): """ Class for creating the AQUA pipeline report """ def __init__(self): super(VLAAquaXmlGenerator, self).__init__()
[docs] def get_project_structure(self, context): # get base XML from base class root = super(VLAAquaXmlGenerator, self).get_project_structure(context) # add our ALMA-specific elements ElementTree.SubElement(root, 'OusEntityId').text = context.project_structure.ous_entity_id ElementTree.SubElement(root, 'OusPartId').text = context.project_structure.ous_part_id ElementTree.SubElement(root, 'OusStatusEntityId').text = context.project_structure.ousstatus_entity_id return root
[docs] def get_calibration_topic(self, context, topic_results): # get base XML from base class xml_root = super(VLAAquaXmlGenerator, self).get_calibration_topic(context, topic_results) m = { 'hifa_gfluxscale': (operator.attrgetter('measurements'), lambda r: str(r.qa.representative.score)) } flux_xml = flux_xml_for_stages(context, topic_results, m) xml_root.extend(flux_xml) return xml_root
[docs] def get_dataset_topic(self, context, topic_results): # get base XML from base class xml_root = super(VLAAquaXmlGenerator, self).get_dataset_topic(context, topic_results) m = { 'hifv_importdata': (lambda x: x.setjy_results[0].measurements, lambda _: UNDEFINED), } flux_xml = flux_xml_for_stages(context, topic_results, m) # omit containing flux measurement element if no measurements were found if len(list(flux_xml)) > 0: xml_root.extend(flux_xml) sensitivity_xml = aqua.sensitivity_xml_for_stages(context, topic_results) # omit containing element if no measurements were found if len(list(sensitivity_xml)) > 0: xml_root.extend(sensitivity_xml) return xml_root
[docs]def flux_xml_for_stages(context, results, accessor_dict): """ Get the XML for flux measurements contained in a list of results. This function is a higher-order function; it expects to be given a dict of accessor functions, which it uses to access the flux measurements and QA score of the appropriate results. 'Appropriate' means that the task name matches the dict key. This lets the function call different accessor functions for different types of result. The dict accessor dict uses task names as keys, with values as two-tuples comprising 1. a function to access the flux measurements for a result 2. a function to access the QA score for that result :param context: pipeline context :param results: results to process :param accessor_dict: dict of accessor functions :return: XML for flux measurements :rtype: xml.etree.cElementTree.Element """ xml_root = ElementTree.Element('FluxMeasurements') for result in results: pipeline_casa_task = result.pipeline_casa_task for task_name, (flux_accessor, score_accessor) in accessor_dict.items(): # need parenthesis to distinguish between cases such as # hifa_gfluxscale and hifa_gfluxscaleflag if pipeline_casa_task.startswith(task_name + '('): flux_xml = xml_for_flux_stage(context, result, task_name, flux_accessor, score_accessor) xml_root.append(flux_xml) return xml_root
[docs]def xml_for_flux_stage(context, stage_results, origin, accessor, score_accessor): """ Get the XML for all flux measurements contained in a ResultsList. :param context: pipeline context :param stage_results: ResultList containing flux results to summarise :param origin: text for Origin attribute value :param accessor: function that returns the flux measurements from the Result :param score_accessor: function that returns the QA score for the Result :rtype: xml.etree.cElementTree.Element """ score = score_accessor(stage_results) xml_root = ElementTree.Element('FluxMeasurements', Origin=origin, Score=score) for result in stage_results: vis = os.path.basename(result.inputs['vis']) ms_for_result = context.observing_run.get_ms(vis) measurements = accessor(result) ms_xml = xml_for_extracted_flux_measurements(measurements, ms_for_result) xml_root.extend(ms_xml) return xml_root
[docs]def xml_for_extracted_flux_measurements(all_measurements, ms): """ Get the XML for a set of flux measurements extracted from a Result. :param all_measurements: flux measurements dict. :param ms: measurement set :return: XML :rtype: xml.etree.cElementTree.Element """ asdm = aqua.vis_to_asdm(ms.name) result = [] for field_id, field_measurements in all_measurements.items(): field = ms.get_fields(field_id)[0] field_name = field.name if field_name.startswith('"') and field_name.endswith('"'): field_name = field_name[1:-1] for measurement in sorted(field_measurements, key=lambda m: int(m.spw_id)): spw = ms.get_spectral_window(measurement.spw_id) freq_ghz = '{:.6f}'.format(spw.centre_frequency.to_units(measures.FrequencyUnits.GIGAHERTZ)) # I only for now ... for stokes in ['I']: try: flux = getattr(measurement, stokes) flux_jy = flux.to_units(measures.FluxDensityUnits.JANSKY) flux_jy = '{:.3f}'.format(flux_jy) except: continue try: unc = getattr(measurement.uncertainty, stokes) unc_jy = unc.to_units(measures.FluxDensityUnits.JANSKY) if unc_jy != 0: unc_jy = '{:.6f}'.format(unc_jy) else: unc_jy = '' except: unc_jy = '' xml = ElementTree.Element('FluxMeasurement', SpwName=spw.name, MsSpwId=str(spw.id), FluxJy=flux_jy, ErrorJy=unc_jy, Asdm=asdm, Field=field_name, FrequencyGHz=freq_ghz) result.append(xml) return result
def _hifa_preimagecheck_sensitivity_exporter(stage_results): # XML exporter expects this function to return a list of dictionaries l = [] for result in stage_results: l.extend(result.sensitivities_for_aqua) return l aqua.TASK_NAME_TO_SENSITIVITY_EXPORTER['hifa_imageprecheck'] = _hifa_preimagecheck_sensitivity_exporter