import copy
import os
from typing import List
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import caltable as bcaltable
from pipeline.hif.tasks.common import commoncalinputs as commoncalinputs
from pipeline.infrastructure.callibrary import CalApplication, CalFrom, CalToArgs
from pipeline.infrastructure.launcher import Context
LOG = infrastructure.get_logger(__name__)
[docs]class BandpassResults(basetask.Results):
"""
BandpassResults is the results class common to all pipeline bandpass
calibration tasks.
"""
def __init__(self,
final: List[CalApplication]=None,
pool: List[CalApplication]=None,
# TODO preceding was intended to hold child results but it
# does not appear to be used anywhere. I suspect it can be
# removed.
preceding: List[basetask.Results]=None,
applies_adopted: bool=False,
unregister_existing: bool=False):
"""
Construct and return a new BandpassResults.
Set applies_adopted to True if the bandpass calibration is adopted
from another measurement set. This can be the case for sessions,
where a bandpass calibrator is shared between multiple EBs. This
flag is for presentation logic and does not affect the calibration
itself.
:param final: calibrations to be applied by this task (optional)
:param pool: calibrations assessed by this task (optional)
:param preceding: DEPRECATED results from worker tasks executed by
this task (optional)
:param applies_adopted: True if this Results applies a bandpass
caltable generated from another measurement set
:param unregister_existing: True if existing bandpass calibrations
should be unregistered before registering new calibration
"""
if final is None:
final = []
if pool is None:
pool = []
if preceding is None:
preceding = []
super(BandpassResults, self).__init__()
self.pool: List[CalApplication] = []
self.final: List[CalApplication] = []
self.preceding: List[basetask.Results] = []
self.error = set()
self.qa = {}
self.applies_adopted: bool = applies_adopted
self.unregister_existing: bool = unregister_existing
# defensive programming: deepcopy the CalApplications as they're
# adopted just in case the caller updates them after this object is
# constructed.
self.pool.extend(copy.deepcopy(pool))
self.final.extend(copy.deepcopy(final))
self.preceding.extend(copy.deepcopy(preceding))
[docs] def merge_with_context(self, context: Context):
"""
See :method:`~pipeline.api.Results.merge_with_context`
"""
if not self.final:
LOG.error('No results to merge')
return
# If requested, unregister old bandpass calibrations before
# registering this one
if self.unregister_existing:
# Identify the MS to process.
vis: str = os.path.basename(self.inputs['vis'])
# predicate function to match bandpass caltables for this MS
def bandpass_matcher(calto: CalToArgs, calfrom: CalFrom) -> bool:
calto_vis = {os.path.basename(v) for v in calto.vis}
# Standard caltable filenames contain task identifiers,
# caltable type identifiers, etc. We can use this to identify
# caltables created by this task. As an extra check we also
# check the caltable type
do_delete = 'bandpass' in calfrom.gaintable and 'bandpass' in calfrom.caltype and vis in calto_vis
if do_delete:
LOG.info(f'Unregistering previous bandpass calibrations for {vis}')
return do_delete
context.callibrary.unregister_calibrations(bandpass_matcher)
for calapp in self.final:
LOG.debug(f'Adding calibration to callibrary:\n{calapp.calto}\n{calapp.calfrom}')
context.callibrary.add(calapp.calto, calapp.calfrom)
def __str__(self):
s = 'BandpassResults:\n'
for calapp in self.final:
s += f'\tBest caltable for spw #{calapp.spw} in {os.path.basename(calapp.vis)} is {calapp.gaintable}\n'
return s