import operator
import os
import xml.etree.cElementTree as ElementTree
import pipeline.domain.measures as measures
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.launcher as launcher
import pipeline.h.tasks.exportdata.aqua as aqua
from pipeline.h.tasks.exportdata.aqua import UNDEFINED, export_to_disk
LOG = logging.get_logger(__name__)
[docs]def aqua_report_from_file(context_file, aqua_file):
"""
Create AQUA report from a context file on disk.
"""
# Restore context from file
LOG.info('Opening context file: {!s}'.format(context_file))
context = launcher.Pipeline(context=context_file).context
# Produce the AQUA report
aqua_report_from_context(context, aqua_file)
[docs]def aqua_test_report_from_local_file(context_file, aqua_file):
"""
Test AQUA report generation.
The pipeline context file and web log directory must be in the same local directry
"""
LOG.info('Opening context file: {!s} for test'.format(context_file))
context = launcher.Pipeline(context=context_file, path_overrides={'name': os.path.splitext(context_file)[0],
'output_dir': os.getcwd()}).context
# Produce the AQUA report
aqua_report_from_context(context, aqua_file)
[docs]def aqua_report_from_context(context, aqua_file):
"""
Create AQUA report from a context object.
"""
LOG.info('Recipe name: %s' % 'Unknown')
LOG.info(' Number of stages: %d' % context.task_counter)
# Initialize
generator = AlmaAquaXmlGenerator()
report = generator.get_report_xml(context)
LOG.info('Writing aqua report file: %s' % aqua_file)
export_to_disk(report, aqua_file)
[docs]class AlmaAquaXmlGenerator(aqua.AquaXmlGenerator):
"""
Class for creating the AQUA pipeline report
"""
def __init__(self):
super(AlmaAquaXmlGenerator, self).__init__()
[docs] def get_project_structure(self, context):
# get base XML from base class
root = super(AlmaAquaXmlGenerator, self).get_project_structure(context)
# add our ALMA-specific elements
ElementTree.SubElement(root, 'OusEntityId').text = context.project_structure.ous_entity_id
ElementTree.SubElement(root, 'OusPartId').text = context.project_structure.ous_part_id
ElementTree.SubElement(root, 'OusStatusEntityId').text = context.project_structure.ousstatus_entity_id
return root
[docs] def get_calibration_topic(self, context, topic_results):
# get base XML from base class
xml_root = super(AlmaAquaXmlGenerator, self).get_calibration_topic(context, topic_results)
m = {
'hifa_gfluxscale': (operator.attrgetter('measurements'), lambda r: str(r.qa.representative.score))
}
flux_xml = flux_xml_for_stages(context, topic_results, m)
xml_root.extend(flux_xml)
return xml_root
[docs] def get_dataset_topic(self, context, topic_results):
# get base XML from base class
xml_root = super(AlmaAquaXmlGenerator, self).get_dataset_topic(context, topic_results)
m = {
'hifa_importdata': (lambda x: x.setjy_results[0].measurements, lambda _: UNDEFINED),
}
flux_xml = flux_xml_for_stages(context, topic_results, m)
# omit containing flux measurement element if no measurements were found
if len(list(flux_xml)) > 0:
xml_root.extend(flux_xml)
sensitivity_xml = aqua.sensitivity_xml_for_stages(context, topic_results)
# omit containing element if no measurements were found
if len(list(sensitivity_xml)) > 0:
xml_root.extend(sensitivity_xml)
return xml_root
[docs] def get_imaging_topic(self, context, topic_results):
"""
Get the XML for the imaging topic.
:param context: pipeline context
:param topic_results: list of Results for this topic
:return: XML for imaging topic
:rtype: xml.etree.cElementTree.Element
"""
# get base XML from base class
xml_root = super(AlmaAquaXmlGenerator, self).get_imaging_topic(context, topic_results)
sensitivity_xml = aqua.sensitivity_xml_for_stages(context, topic_results, name='ImageSensitivities')
# omit containing element if no measurements were found
if len(list(sensitivity_xml)) > 0:
xml_root.extend(sensitivity_xml)
return xml_root
[docs]def flux_xml_for_stages(context, results, accessor_dict):
"""
Get the XML for flux measurements contained in a list of results.
This function is a higher-order function; it expects to be given a dict
of accessor functions, which it uses to access the flux measurements and
QA score of the appropriate results. 'Appropriate' means that the task
name matches the dict key. This lets the function call different accessor
functions for different types of result.
The dict accessor dict uses task names as keys, with values as two-tuples
comprising
1. a function to access the flux measurements for a result
2. a function to access the QA score for that result
:param context: pipeline context
:param results: results to process
:param accessor_dict: dict of accessor functions
:return: XML for flux measurements
:rtype: xml.etree.cElementTree.Element
"""
xml_root = ElementTree.Element('FluxMeasurements')
for result in results:
pipeline_casa_task = result.pipeline_casa_task
for task_name, (flux_accessor, score_accessor) in accessor_dict.items():
# need parenthesis to distinguish between cases such as
# hifa_gfluxscale and hifa_gfluxscaleflag
if pipeline_casa_task.startswith(task_name + '('):
flux_xml = xml_for_flux_stage(context, result, task_name, flux_accessor, score_accessor)
xml_root.append(flux_xml)
return xml_root
[docs]def xml_for_flux_stage(context, stage_results, origin, accessor, score_accessor):
"""
Get the XML for all flux measurements contained in a ResultsList.
:param context: pipeline context
:param stage_results: ResultList containing flux results to summarise
:param origin: text for Origin attribute value
:param accessor: function that returns the flux measurements from the Result
:param score_accessor: function that returns the QA score for the Result
:rtype: xml.etree.cElementTree.Element
"""
score = score_accessor(stage_results)
xml_root = ElementTree.Element('FluxMeasurements', Origin=origin, Score=score)
for result in stage_results:
vis = os.path.basename(result.inputs['vis'])
# data might have been imported from ASDM, .tar, .tgz file
if 'hifa_importdata(' in stage_results.pipeline_casa_task and not vis.endswith('.ms'):
vis = result.mses[0].basename
ms_for_result = context.observing_run.get_ms(vis)
measurements = accessor(result)
ms_xml = xml_for_extracted_flux_measurements(measurements, ms_for_result)
xml_root.extend(ms_xml)
return xml_root
def _hifa_preimagecheck_sensitivity_exporter(stage_results):
# XML exporter expects this function to return a list of dictionaries
l = []
for result in stage_results:
l.extend(result.sensitivities_for_aqua)
return l
aqua.TASK_NAME_TO_SENSITIVITY_EXPORTER['hifa_imageprecheck'] = _hifa_preimagecheck_sensitivity_exporter
aqua.TASK_NAME_TO_SENSITIVITY_EXPORTER['hif_makeimages'] = _hifa_preimagecheck_sensitivity_exporter