Source code for pipeline.hifv.tasks.flagging.renderer

import collections
import os
import shutil

import pipeline.h.tasks.common.flagging_renderer_utils as fru
import pipeline.h.tasks.common.displays.flagging as flagging
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.renderer.basetemplates as basetemplates
import pipeline.infrastructure.utils as utils
from . import displaycheckflag

LOG = logging.get_logger(__name__)

FlagTotal = collections.namedtuple('FlagSummary', 'flagged total')


[docs]class T2_4MDetailsFlagDeterVLARenderer(basetemplates.T2_4MDetailsDefaultRenderer): FlagTotal = collections.namedtuple('FlagSummary', 'flagged total') def __init__(self, uri='flagdetervla.mako', description='VLA Deterministic flagging', always_rerender=False): super(T2_4MDetailsFlagDeterVLARenderer, self).__init__( uri=uri, description=description, always_rerender=always_rerender)
[docs] def get_display_context(self, context, result): super_cls = super(T2_4MDetailsFlagDeterVLARenderer, self) ctx = super_cls.get_display_context(context, result) weblog_dir = os.path.join(context.report_dir, 'stage%s' % result.stage_number) flag_totals = {} non_science_agents = ['before', 'anos', 'shadow', 'intents'] # Note that the call to common.flagging_renderer_utils.flags_for_result for r in result: flag_totals = utils.dict_merge(flag_totals, fru.flags_for_result(r, context, non_science_agents=non_science_agents)) # copy template files across to weblog directory toggle_to_filenames = {'online' : 'fileonline', 'template' : 'filetemplate'} inputs = r.inputs for toggle, filenames in toggle_to_filenames.items(): src = inputs[filenames] if inputs[toggle] and os.path.exists(src): LOG.trace('Copying %s to %s' % (src, weblog_dir)) shutil.copy(src, weblog_dir) flagcmd_files = {} for r in result: # write final flagcmds to a file ms = context.observing_run.get_ms(r.inputs['vis']) flagcmds_filename = '%s-agent_flagcmds.txt' % ms.basename flagcmds_path = os.path.join(weblog_dir, flagcmds_filename) with open(flagcmds_path, 'w') as flagcmds_file: terminated = '\n'.join(r.flagcmds()) flagcmds_file.write(terminated) flagcmd_files[ms.basename] = flagcmds_path # # collect the agent names # agent_names = set() # for r in result: # agent_names.update([s['name'] for s in r.summaries]) # # # get agent names in execution order # order = ['before', 'online', 'template', 'autocorr', 'shadow', # 'intents', 'edgespw'] # agents = [s for s in order if s in agent_names] # return all agents so we get ticks and crosses against each one agents = ['before', 'anos', 'shadow', 'intents', 'online', 'template', 'autocorr', 'edgespw', 'clip', 'quack', 'baseband'] flagplots = {os.path.basename(r.inputs['vis']): self.flagplot(r, context) for r in result} ctx.update({ 'flags': flag_totals, 'agents': agents, 'dirname': weblog_dir, 'flagcmds': flagcmd_files, 'flagplots': flagplots}) return ctx
[docs] def flagplot(self, result, context): plotter = flagging.PlotAntsChart(context, result) return plotter.plot()
[docs] def flags_for_result(self, result, context): ms = context.observing_run.get_ms(result.inputs['vis']) summaries = result.summaries by_intent = self.flags_by_intent(ms, summaries) by_spw = self.flags_by_science_spws(ms, summaries) return {ms.basename : utils.dict_merge(by_intent, by_spw)}
[docs] def flags_by_intent(self, ms, summaries): # create a dictionary of scans per observing intent, eg. 'PHASE':[1,2,7] intent_scans = {} for intent in ('BANDPASS', 'PHASE', 'AMPLITUDE', 'TARGET'): # convert IDs to strings as they're used as summary dictionary keys intent_scans[intent] = [str(s.id) for s in ms.scans if intent in s.intents] # while we're looping, get the total flagged by looking in all scans intent_scans['TOTAL'] = [str(s.id) for s in ms.scans] total = collections.defaultdict(dict) previous_summary = None for summary in summaries: for intent, scan_ids in intent_scans.items(): flagcount = 0 totalcount = 0 for i in scan_ids: flagcount += int(summary['scan'][i]['flagged']) totalcount += int(summary['scan'][i]['total']) if previous_summary: flagcount -= int(previous_summary['scan'][i]['flagged']) ft = T2_4MDetailsFlagDeterVLARenderer.FlagTotal(flagcount, totalcount) total[summary['name']][intent] = ft previous_summary = summary return total
[docs] def flags_by_science_spws(self, ms, summaries): science_spws = ms.get_spectral_windows(science_windows_only=True) total = collections.defaultdict(dict) previous_summary = None for summary in summaries: flagcount = 0 totalcount = 0 for spw in science_spws: spw_id = str(spw.id) flagcount += int(summary['spw'][spw_id]['flagged']) totalcount += int(summary['spw'][spw_id]['total']) if previous_summary: flagcount -= int(previous_summary['spw'][spw_id]['flagged']) ft = T2_4MDetailsFlagDeterVLARenderer.FlagTotal(flagcount, totalcount) total[summary['name']]['SCIENCE SPWS'] = ft previous_summary = summary return total
# not used in 4.5.2+ and C3R4+
[docs]class T2_4MDetailstargetflagRenderer(basetemplates.T2_4MDetailsDefaultRenderer): def __init__(self, uri='vlatargetflag.mako', description='Targetflag (All targets through RFLAG)', always_rerender=False): super(T2_4MDetailstargetflagRenderer, self).__init__( uri=uri, description=description, always_rerender=always_rerender)
[docs] def get_display_context(self, context, results): super_cls = super(T2_4MDetailstargetflagRenderer, self) ctx = super_cls.get_display_context(context, results) weblog_dir = os.path.join(context.report_dir, 'stage%s' % results.stage_number) summary_plots = {} ''' for result in results: plotter = targetflagdisplay.targetflagSummaryChart(context, result) plots = plotter.plot() ms = os.path.basename(result.inputs['vis']) summary_plots[ms] = plots ''' ctx.update({'summary_plots': summary_plots, 'dirname': weblog_dir}) return ctx
[docs]class T2_4MDetailscheckflagRenderer(basetemplates.T2_4MDetailsDefaultRenderer): def __init__(self, uri='checkflag.mako', description='Checkflag summary', always_rerender=False): super(T2_4MDetailscheckflagRenderer, self).__init__(uri=uri, description=description, always_rerender=always_rerender)
[docs] def get_display_context(self, context, results): super_cls = super(T2_4MDetailscheckflagRenderer, self) ctx = super_cls.get_display_context(context, results) weblog_dir = os.path.join(context.report_dir, 'stage%s' % results.stage_number) summary_plots = {} for result in results: plotter = displaycheckflag.checkflagSummaryChart(context, result) plots = plotter.plot() ms = os.path.basename(result.inputs['vis']) summary_plots[ms] = plots ctx.update({'summary_plots': summary_plots, 'dirname': weblog_dir}) return ctx