"""
Created on 31 Jan 2017
@author: Vincent Geers (UKATC)
"""
import collections
import os
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.renderer.basetemplates as basetemplates
import pipeline.infrastructure.utils as utils
LOG = logging.get_logger(__name__)
FlagTotal = collections.namedtuple('FlagSummary', 'flagged total')
[docs]class T2_4MDetailsCorrectedampflagRenderer(basetemplates.T2_4MDetailsDefaultRenderer):
"""
Renders detailed HTML output for the Correctedampflag task.
"""
def __init__(self, uri='correctedampflag.mako',
description='Flag corrected - model amplitudes for calibrator.',
always_rerender=False):
super(T2_4MDetailsCorrectedampflagRenderer, self).__init__(
uri=uri, description=description, always_rerender=always_rerender)
[docs] def update_mako_context(self, mako_context, pipeline_context, results):
# Generate HTML reports, including flag commands.
htmlreports = self._get_htmlreports(pipeline_context, results)
# Generate flagging totals.
flag_totals = self._get_flag_totals(pipeline_context, results)
# Update the mako context.
mako_context.update(
{'htmlreports': htmlreports,
'flags': flag_totals,
})
def _get_flag_totals(self, context, results):
# Initialize items that are to be exported to the
# mako context.
flag_totals = collections.defaultdict(dict)
# For each result in the results list...
for result in results:
# Get flagging state before flagging.
flag_totals[result.vis]['before'] = self._flags_for_result(
result, context, summary='first')
# Get flagging state after flagging.
flag_totals[result.vis]['after'] = self._flags_for_result(
result, context, summary='last')
return flag_totals
def _get_htmlreports(self, context, results):
report_dir = context.report_dir
weblog_dir = os.path.join(report_dir, 'stage%s' % results.stage_number)
htmlreports = {}
for result in results:
flagcmd_abspath = self._write_flagcmd_to_disk(weblog_dir, result)
flagcmd_relpath = os.path.relpath(flagcmd_abspath, report_dir)
table_basename = os.path.basename(result.vis)
htmlreports[table_basename] = flagcmd_relpath
return htmlreports
def _write_flagcmd_to_disk(self, weblog_dir, result):
tablename = os.path.basename(result.vis)
filename = os.path.join(weblog_dir, '%s-flag_commands.txt' % tablename)
flagcmds = [l.flagcmd for l in result.flagcmds()]
with open(filename, 'w') as flagfile:
flagfile.writelines(['# Flag commands for %s\n#\n' % tablename])
flagfile.writelines(['%s\n' % cmd for cmd in flagcmds])
if not flagcmds:
flagfile.writelines(['# No flag commands generated\n'])
return filename
def _flags_for_result(self, result, context, summary=None):
ms = context.observing_run.get_ms(result.vis)
summaries = result.summaries
if summary == 'first':
# select only first summary, but keep as list
summaries = summaries[:1]
elif summary == 'last':
# select only last summary, but keep as list
summaries = summaries[-1:]
by_intent = self._flags_by_intent(ms, summaries)
by_spw = self._flags_by_spws(summaries)
return utils.dict_merge(by_intent, by_spw)
def _flags_by_intent(self, ms, summaries):
# create a dictionary of scans per observing intent, eg. 'PHASE':[1,2,7]
intent_scans = {}
for intent in ('BANDPASS', 'PHASE', 'AMPLITUDE', 'CHECK', 'POLARIZATION', 'POLANGLE', 'POLLEAKAGE', 'TARGET'):
# convert IDs to strings as they're used as summary dictionary keys
intent_scans[intent] = [str(s.id) for s in ms.scans
if intent in s.intents]
# while we're looping, get the total flagged by looking in all scans
intent_scans['TOTAL'] = [str(s.id) for s in ms.scans]
total = collections.defaultdict(dict)
previous_summary = None
for summary in summaries:
for intent, scan_ids in intent_scans.items():
flagcount = 0
totalcount = 0
for scan_id in scan_ids:
if scan_id in summary['scan']:
flagcount += int(summary['scan'][scan_id]['flagged'])
totalcount += int(summary['scan'][scan_id]['total'])
if previous_summary:
if scan_id in previous_summary['scan']:
flagcount -= int(previous_summary['scan'][scan_id]['flagged'])
ft = FlagTotal(flagcount, totalcount)
# The individual summaries may have been named differently from
# each other, but the renderer will expect a single summary, so
# consolidate summaries into a single summary named "Summary"
total['Summary'][intent] = ft
previous_summary = summary
return total
def _flags_by_spws(self, summaries):
total = collections.defaultdict(dict)
previous_summary = None
for summary in summaries:
flagcount = 0
totalcount = 0
for spw in summary['spw']:
try:
flagcount += int(summary['spw'][spw]['flagged'])
totalcount += int(summary['spw'][spw]['total'])
except:
pass
if previous_summary:
flagcount -= int(previous_summary['spw'][spw]['flagged'])
ft = FlagTotal(flagcount, totalcount)
total[summary['name']]['TSYS SPWS'] = ft
previous_summary = summary
return total