"""
Created on 25 Nov 2014
@author: sjw
"""
import collections
import functools
from typing import Optional, List
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.utils as utils
from pipeline.infrastructure.launcher import Context
LOG = logging.get_logger(__name__)
FlagTotal = collections.namedtuple('FlagSummary', 'flagged total')
[docs]def flags_for_result(result,
context: Context,
intents_to_summarise: Optional[List[str]] = None,
non_science_agents: Optional[List[str]] = None):
if intents_to_summarise is None:
intents_to_summarise = ['BANDPASS', 'PHASE', 'AMPLITUDE', 'TARGET']
if non_science_agents is None:
non_science_agents = []
ms = context.observing_run.get_ms(result.inputs['vis'])
summaries = result.summaries
by_intent = flags_by_intent(ms, summaries, intents_to_summarise)
by_spw = flags_by_science_spws(ms, summaries)
merged = utils.dict_merge(by_intent, by_spw)
adjusted = adjust_non_science_totals(merged, non_science_agents)
return {ms.basename: adjusted}
[docs]def flags_by_intent(ms, summaries, intents=None):
# create a dictionary of scans per observing intent, eg. 'PHASE':['1','2','7']
intent_scans = {
# convert IDs to strings as they're used as summary dictionary keys
intent: [str(scan.id) for scan in ms.scans if intent in scan.intents]
for intent in intents
}
# while we're looping, get the total flagged by looking in all scans
intent_scans['TOTAL'] = [str(s.id) for s in ms.scans]
total = collections.defaultdict(dict)
previous_summary = None
for summary in summaries:
for intent, scan_ids in intent_scans.items():
flagcount = 0
totalcount = 0
for i in scan_ids:
if i not in summary['scan']:
continue
flagcount += int(summary['scan'][i]['flagged'])
totalcount += int(summary['scan'][i]['total'])
if previous_summary:
flagcount -= int(previous_summary['scan'][i]['flagged'])
ft = FlagTotal(flagcount, totalcount)
total[summary['name']][intent] = ft
previous_summary = summary
return total
[docs]def flags_by_science_spws(ms, summaries):
science_spws = ms.get_spectral_windows(science_windows_only=True)
total = collections.defaultdict(dict)
previous_summary = None
for summary in summaries:
flagcount = 0
totalcount = 0
for spw in science_spws:
spw_id = str(spw.id)
flagcount += int(summary['spw'][spw_id]['flagged'])
totalcount += int(summary['spw'][spw_id]['total'])
if previous_summary:
flagcount -= int(previous_summary['spw'][spw_id]['flagged'])
ft = FlagTotal(flagcount, totalcount)
total[summary['name']]['SCIENCE SPWS'] = ft
previous_summary = summary
return total
[docs]def adjust_non_science_totals(flagtotals, non_science_agents=None):
"""
Return a copy of the FlagSummary dictionaries, with totals reduced to
account for flagging performed by non-science flagging agents.
The incoming flagtotals report how much data was flagged per agent per
data selection. These flagtotals are divided into two groups: those whose
agent should be considered 'non-science' (and are indicated as such in the
non_science_agents argument) and the remainder. The total number of rows
flagged due to non-science agents is calculated and subtracted from the
total for each of the remainder agents.
"""
if not non_science_agents:
return flagtotals
agents_to_copy = set(non_science_agents)
agents_to_adjust = set(flagtotals.keys()) - agents_to_copy
data_selections = set()
for result in flagtotals.values():
data_selections.update(set(result.keys()))
# copy agents that use the total number of visibilities across to new
# results
adjusted_results = dict((agent, flagtotals[agent])
for agent in agents_to_copy
if agent in flagtotals)
# tot up how much data was flagged by each agent per data selection
flagged_non_science = {}
for data_selection in data_selections:
flagged_non_science[data_selection] = sum([v[data_selection].flagged
for v in adjusted_results.values()])
# subtract this 'number of rows flagged per data selection' from the total
# for the remaining agents
for agent in agents_to_adjust:
for data_selection in flagtotals[agent]:
unadjusted = flagtotals[agent][data_selection]
adjusted = FlagTotal(unadjusted.flagged,
unadjusted.total - flagged_non_science[data_selection])
if agent not in adjusted_results:
adjusted_results[agent] = {}
adjusted_results[agent][data_selection] = adjusted
return adjusted_results
[docs]def intents_to_summarise(context: Context) -> List[str]:
# Find out which intents to list in the flagging table
# First get all intents across all MSes in context
context_intents = functools.reduce(lambda x, m: x.union(m.intents),
context.observing_run.measurement_sets,
set())
# then match intents against those we want in the table, removing those not
# present. List order is preserved in the table.
all_flag_summary_intents = [
'AMPLITUDE', 'BANDPASS', 'CHECK', 'PHASE', 'POLANGLE', 'POLARIZATION', 'POLLEAKAGE', 'TARGET'
]
intents_to_summarise = [i for i in all_flag_summary_intents
if i in context_intents.intersection(set(all_flag_summary_intents))]
return intents_to_summarise