Source code for pipeline.hifa.tasks.gfluxscaleflag.renderer

"""
Created on 01 Jun 2017

@author: Vincent Geers (UKATC)
"""

import collections
import copy
import os

import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.renderer.basetemplates as basetemplates
from pipeline.h.tasks.applycal.renderer import copy_callibrary
from pipeline.hif.tasks.correctedampflag.renderer import T2_4MDetailsCorrectedampflagRenderer
from pipeline.infrastructure import basetask

LOG = logging.get_logger(__name__)


[docs]class T2_4MDetailsGfluxscaleflagRenderer(basetemplates.T2_4MDetailsDefaultRenderer): """ Renders detailed HTML output for the Gfluxscaleflag task. """ def __init__(self, uri='gfluxscaleflag.mako', description='Phased-up flux scale calibration + flagging', always_rerender=False): super(T2_4MDetailsGfluxscaleflagRenderer, self).__init__( uri=uri, description=description, always_rerender=always_rerender) # Attach correctedampflag renderer. self.cafrenderer = T2_4MDetailsCorrectedampflagRenderer( uri=uri, description=description, always_rerender=always_rerender)
[docs] def update_mako_context(self, mako_context, pipeline_context, results): # # Get flagging reports, summaries # cafresults = basetask.ResultsList() for result in results: cafresults.append(result.cafresult) cafresults.stage_number = results.stage_number self.cafrenderer.update_mako_context( mako_context, pipeline_context, cafresults) # # Get the diagnostic plots. # plot_dict = { 'time_plots': get_plot_dicts(pipeline_context, results, 'time'), 'uvdist_plots': get_plot_dicts(pipeline_context, results, 'uvdist') } mako_context.update(plot_dict) # PIPE-615: store callibrary tables in the weblog directory copy_callibrary(results, pipeline_context.report_dir)
[docs]def get_plot_dicts(pipeline_context, results, plot_type): """ Create the flagging diagnostic plots for the given x axis. :param pipeline_context: pipeline context :param results: results to create plots for :param plot_type: 'time' or 'uvdist' :return: """ # plots will be moved to this location plot_dest_dir = os.path.join(pipeline_context.report_dir, 'stage%s' % results.stage_number) d = collections.OrderedDict() for result in results: vis = os.path.basename(result.inputs['vis']) d[vis] = [] for idx, key in enumerate(['before', 'after']): if key in result.plots: plots = result.plots[key][plot_type] relocated = relocate_plots(plots, plot_dest_dir) # the weblog needs to identify each plot's associated plot type for p in relocated: p.parameters['type'] = key p.parameters['type_idx'] = idx d[vis].extend(relocated) return d
[docs]def relocate_plots(plots, dest_dir): """ Relocate a list of plots, returning a list of Plot objects that reflect the new location. :param plots: list of Plot objects :param dest_dir: location to move plots to :return: list of Plot objects """ # create a copy so that we do not alter the result, which we'd like to # keep in its original state plot_copies = copy.deepcopy(plots) if plot_copies and not os.path.exists(dest_dir): os.makedirs(dest_dir) # move PNGs to destination directory and modify Plot path to reflect the # new location for plot in plot_copies: src = plot.abspath dst = os.path.join(dest_dir, plot.basename) # we always need to modify the plot for rerender=True mode plot.abspath = dst # .. but we should only attempt to move the file if it exists if os.path.exists(src): os.rename(src, dst) return plot_copies