import collections
import copy
import datetime
import functools
import itertools
import operator
import os
import uuid
import weakref
from typing import Callable, List, Set, Tuple
import cachetools
import intervaltree
from casatasks.private.callibrary import applycaltocallib
from . import casa_tools, launcher, logging, utils
LOG = logging.get_logger(__name__)
CalToArgs = collections.namedtuple('CalToArgs', ['vis', 'spw', 'field', 'intent', 'antenna'])
# struct used to link calapplication to the task and inputs that created it
CalAppOrigin = collections.namedtuple('CalAppOrigin', ['task', 'inputs'])
# observations before this date are considered Cycle 0 observations
CYCLE_0_END_DATE = datetime.datetime(2013, 1, 21)
[docs]class CalApplication(object):
"""
CalApplication maps calibration tables and their application arguments to
a target data selection, encapsulated as |CalFrom| and |CalTo| objects
respectively.
.. py:attribute:: calto
the |CalTo| representing the data selection to which the calibration
should apply.
.. py:attribute:: calfrom
the |CalFrom| representing the calibration and application parameters
.. py:attribute:: origin
the |CalAppOrigin| marking how this calibration was created
.. |CalTo| replace:: :class:`CalTo`
.. |CalFrom| replace:: :class:`CalFrom`
.. |CalAppOrigin| replace:: :class:`CalAppOrigin`
"""
def __init__(self, calto, calfrom, origin=None):
self.calto = calto
if not isinstance(calfrom, list):
calfrom = [calfrom]
self.calfrom = calfrom
if not isinstance(origin, list):
origin = [origin]
self.origin = origin
[docs] @staticmethod
def from_export(s):
"""
Unmarshal a CalApplication from a string.
:rtype: the unmarshalled :class:`CalApplication` object
"""
d = eval(s.replace('applycal(', 'dict('))
calto = CalTo(vis=d['vis'], field=d['field'], spw=d['spw'],
antenna=d['antenna'], intent=d['intent'])
# wrap these values in a list if they are single valued,
# eg. 'm31' -> ['m31']
for key in ('gainfield', 'gaintable', 'interp'):
if isinstance(d[key], str):
d[key] = [d[key]]
for key in ('calwt',):
if isinstance(d[key], bool):
d[key] = [d[key]]
# do the same for spwmap. A bit more complicated, as a single valued
# spwmap is a list of integers, or may not have any values at all.
try:
if not isinstance(d['spwmap'][0], list):
d['spwmap'] = [d['spwmap']]
except IndexError:
d['spwmap'] = [d['spwmap']]
zipped = list(zip(d['gaintable'], d['gainfield'], d['interp'], d['spwmap'], d['calwt']))
calfroms = []
for (gaintable, gainfield, interp, spwmap, calwt) in zipped:
if os.path.exists(gaintable):
with casa_tools.TableReader(gaintable) as caltable:
viscal = caltable.getkeyword('VisCal')
else:
LOG.warning('Could not access {}. Using heuristics to determine caltable type'
''.format(os.path.gaintable))
if 'tsys' in gaintable:
viscal = 'B TSYS'
elif 'bcal' in gaintable:
viscal = 'B JONES'
elif 'gpcal' in gaintable:
viscal = 'G JONES'
elif 'gcal' in gaintable:
viscal = 'G JONES'
elif 'gacal' in gaintable:
viscal = 'G JONES'
else:
raise ValueError(gaintable)
caltype = CalFrom.get_caltype_for_viscal(viscal)
calfrom = CalFrom(gaintable, gainfield=gainfield, interp=interp,
spwmap=spwmap, calwt=calwt, caltype=caltype)
LOG.trace('Marking caltable \'%s\' as caltype \'%s\''
'' % (gaintable, calfrom.caltype))
calfroms.append(calfrom)
return CalApplication(calto, calfroms)
[docs] def as_applycal(self):
"""
Get a representation of this object as a CASA applycal call.
:rtype: string
"""
args = {
'vis': self.vis,
'field': self.field,
'intent': self.intent,
'spw': self.spw,
'antenna': self.antenna,
'gaintable': self.gaintable,
'gainfield': self.gainfield,
'spwmap': self.spwmap,
'interp': self.interp,
'calwt': self.calwt
}
for key in ('gaintable', 'gainfield', 'spwmap', 'interp', 'calwt'):
if isinstance(args[key], str):
args[key] = '\'%s\'' % args[key]
return ('applycal(vis=\'{vis}\', field=\'{field}\', '
'intent=\'{intent}\', spw=\'{spw}\', antenna=\'{antenna}\', '
'gaintable={gaintable}, gainfield={gainfield}, '
'spwmap={spwmap}, interp={interp}, calwt={calwt})'
''.format(**args))
@property
def antenna(self):
"""
The antennas to which the calibrations apply.
:rtype: string
"""
return self.calto.antenna
@property
def calwt(self):
"""
The calwt parameters to be used when applying these calibrations.
:rtype: a scalar string if representing 1 calibration, otherwise a
list of strings
"""
l = [cf.calwt for cf in self.calfrom]
return l[0] if len(l) is 1 else l
[docs] def exists(self):
"""
Test whether all calibration tables referred to by this application exist.
:rtype: boolean
"""
for cf in self.calfrom:
if not os.path.exists(cf.gaintable):
return False
return True
@property
def field(self):
"""
The fields to which the calibrations apply.
:rtype: string
"""
return self.calto.field
@property
def gainfield(self):
"""
The gainfield parameters to be used when applying these calibrations.
:rtype: a scalar string if representing 1 calibration, otherwise a
list of strings
"""
l = [cf.gainfield for cf in self.calfrom]
return l[0] if len(l) is 1 else l
@property
def gaintable(self):
"""
The gaintable parameters to be used when applying these calibrations.
:rtype: a scalar string if representing 1 calibration, otherwise a
list of strings
"""
l = [cf.gaintable for cf in self.calfrom]
return l[0] if len(l) is 1 else l
@property
def intent(self):
"""
The observing intents to which the calibrations apply.
:rtype: string
"""
return self.calto.intent
@property
def interp(self):
"""
The interp parameters to be used when applying these calibrations.
:rtype: a scalar string if representing 1 calibration, otherwise a
list of strings
"""
l = [cf.interp for cf in self.calfrom]
return l[0] if len(l) is 1 else l
@property
def spw(self):
"""
The spectral windows to which the calibrations apply.
:rtype: string
"""
return self.calto.spw
@property
def spwmap(self):
"""
The spwmap parameters to be used when applying these calibrations.
:rtype: a scalar string if representing 1 calibration, otherwise a
list of strings
"""
# convert tuples back into lists for the CASA argument
l = [list(cf.spwmap) for cf in self.calfrom]
return l[0] if len(l) is 1 else l
@property
def vis(self):
"""
The name of the measurement set to which the calibrations apply.
:rtype: string
"""
return self.calto.vis
def __str__(self):
return self.as_applycal()
def __repr__(self):
return 'CalApplication(%s, %s)' % (self.calto, self.calfrom)
[docs]class CalTo(object):
"""
CalTo represents a target data selection to which a calibration can be
applied.
"""
__slots__ = ('_antenna', '_intent', '_field', '_spw', '_vis')
def __getstate__(self):
return self._antenna, self._intent, self._field, self._spw, self._vis
def __setstate__(self, state):
self._antenna, self._intent, self._field, self._spw, self._vis = state
[docs] @staticmethod
def from_caltoargs(cta: CalToArgs) -> "CalTo":
def join(s: Set[str]) -> str:
return ','.join((str(o) for o in s))
return CalTo(vis=join(cta.vis), field=join(cta.field), spw=join(cta.spw), antenna=join(cta.antenna), intent=join(cta.intent))
def __init__(self, vis=None, field='', spw='', antenna='', intent=''):
self.vis = vis
self.field = field
self.spw = spw
self.antenna = antenna
self.intent = intent
@property
def antenna(self):
return self._antenna
@antenna.setter
def antenna(self, value):
if value is None:
value = ''
self._antenna = utils.find_ranges(str(value))
@property
def field(self):
return self._field
@field.setter
def field(self, value):
if value is None:
value = ''
self._field = str(value)
@property
def intent(self):
return self._intent
@intent.setter
def intent(self, value):
if value is None:
value = ''
self._intent = str(value)
@property
def spw(self):
return self._spw
@spw.setter
def spw(self, value):
if value is None:
value = ''
self._spw = utils.find_ranges(str(value))
@property
def vis(self):
return self._vis
@vis.setter
def vis(self, value=None):
self._vis = str(value)
def __repr__(self):
return ('CalTo(vis=\'%s\', field=\'%s\', spw=\'%s\', antenna=\'%s\','
'intent=\'%s\')' % (self.vis, self.field, self.spw, self.antenna,
self.intent))
[docs]class CalFrom(object):
"""
CalFrom represents a calibration table and the CASA arguments that should
be used when applying that calibration table.
.. py:attribute:: CALTYPES
an enumeration of calibration table types identified by this code.
.. py:attribute:: CALTYPE_TO_VISCAL
mapping of calibration type to caltable identifier as store in the table
header
.. py:attribute:: VISCAL
mapping of calibration table header information to a description of
that table type
"""
CALTYPES = {
'unknown': 0,
'gaincal': 1,
'bandpass': 2,
'tsys': 3,
'wvr': 4,
'polarization': 5,
'antpos': 6,
'gc': 7,
'opac': 8,
'rq': 9,
'swpow': 10,
'finalcal': 11,
'uvcont': 12,
'amp': 13,
'ps': 14,
'otfraster': 15,
'tecim': 16,
'kcross': 17,
'otf': 18,
}
CALTYPE_TO_VISCAL = {
'gaincal': ('G JONES', 'GSPLINE', 'T JONES'),
'bandpass': ('B JONES', 'BPOLY'),
'tsys': ('B TSYS',),
'antpos': ('KANTPOS JONES',),
'uvcont': ('A MUELLER',),
# 'amp': ('G JONES',),
'ps': ('SDSKY_PS',),
'otfraster': ('SDSKY_RASTER',),
'otf': ('SDSKY_OTF',),
}
VISCAL = {
'P JONES': 'P Jones (parallactic angle phase)',
'T JONES': 'T Jones (polarization-independent troposphere)',
'TF JONES': 'Tf Jones (frequency-dependent atmospheric complex gain)',
'G JONES': 'G Jones (electronic Gain)',
'B JONES': 'B Jones (bandpass)',
'DGEN JONES': 'Dgen Jones (instrumental polarization)',
'DFGEN JONES': 'Dfgen Jones (frequency-dependent instrumental polarization)',
'D JONES': 'D Jones (instrumental polarization)',
'DF JONES': 'Df Jones (frequency-dependent instrumental polarization)',
'J JONES': 'J Jones (generic polarized gain)',
'M MUELLER': 'M Mueller (baseline-based)',
'MF MUELLER': 'Mf Mueller (closure bandpass)',
'TOPAC': 'TOpac (Opacity corrections in amplitude)',
'TFOPAC': 'TfOpac (frequency-dependent opacity)',
'X MUELLER': 'X Mueller (baseline-based)',
'X JONES': 'X Jones (antenna-based)',
'XF JONES': 'Xf Jones (antenna-based)',
'GLINXPH JONES': 'GlinXph Jones (X-Y phase)',
'B TSYS': 'B TSYS (freq-dep Tsys)',
'BPOLY': 'B Jones Poly (bandpass)',
'GSPLINE': 'G Jones SPLINE (elec. gain)',
'KANTPOS JONES': 'KAntPos Jones (antenna position errors)',
'A MUELLER': 'A Mueller (baseline-based)',
}
# Hundreds of thousands of CalFroms can be created and stored in a context.
# To save memory, CalFrom uses a Flyweight pattern, caching objects in
# _CalFromPool and returning a shared immutable instance for CalFroms
# constructed with the same arguments.
_CalFromPool = weakref.WeakValueDictionary()
@staticmethod
def _calc_hash(gaintable, gainfield, interp, spwmap, calwt):
"""
Generate a hash code unique to the given arguments.
:rtype: integer
"""
result = 17
result = 37 * result + hash(gaintable)
result = 37 * result + hash(gainfield)
result = 37 * result + hash(interp)
result = 37 * result + hash(spwmap)
result = 37 * result + hash(calwt)
return result
def __new__(cls, gaintable=None, gainfield='', interp='linear,linear',
spwmap=None, caltype='unknown', calwt=True):
if spwmap is None:
spwmap = []
if gaintable is None:
raise ValueError('gaintable must be specified. Got None')
if not isinstance(gainfield, str):
raise ValueError('gainfield must be a string. Got %s' % str(gainfield))
if not isinstance(interp, str):
raise ValueError('interp must be a string. Got %s' % str(interp))
if isinstance(spwmap, tuple):
spwmap = [spw for spw in spwmap]
if not isinstance(spwmap, list):
raise ValueError('spwmap must be a list. Got %s' % str(spwmap))
# Flyweight instances should be immutable, so convert spwmap to a
# tuple. This also makes spwmap hashable for our hash function.
spwmap = tuple([o for o in spwmap])
caltype = caltype.lower()
assert caltype in CalFrom.CALTYPES
arg_hash = CalFrom._calc_hash(gaintable, gainfield, interp, spwmap,
calwt)
obj = CalFrom._CalFromPool.get(arg_hash, None)
if not obj:
LOG.trace('Creating new CalFrom(gaintable=\'%s\', '
'gainfield=\'%s\', interp=\'%s\', spwmap=%s, '
'caltype=\'%s\', calwt=%s)' %
(gaintable, gainfield, interp, spwmap, caltype, calwt))
obj = object.__new__(cls)
obj.__gaintable = gaintable
obj.__gainfield = gainfield
obj.__interp = interp
obj.__spwmap = spwmap
obj.__caltype = caltype
obj.__calwt = calwt
LOG.debug('Adding new CalFrom to pool: %s' % obj)
CalFrom._CalFromPool[arg_hash] = obj
LOG.trace('New pool contents: %s' % list(CalFrom._CalFromPool.items()))
else:
LOG.trace('Reusing existing CalFrom(gaintable=\'%s\', '
'gainfield=\'%s\', interp=\'%s\', spwmap=\'%s\', '
'caltype=\'%s\', calwt=%s)' %
(gaintable, gainfield, interp, spwmap, caltype, calwt))
return obj
__slots__ = ('__caltype', '__calwt', '__gainfield', '__gaintable',
'__interp', '__spwmap', '__weakref__')
def __getstate__(self):
return (self.__caltype, self.__calwt, self.__gainfield,
self.__gaintable, self.__interp, self.__spwmap)
def __setstate__(self, state):
# a misguided attempt to clear stale CalFroms when loading from a
# pickle. I don't think this should be done here.
# # prevent exception with pickle format #1 by calling hash on properties
# # rather than the object
# (_, calwt, gainfield, gaintable, interp, spwmap) = state
# old_hash = CalFrom._calc_hash(gaintable, gainfield, interp, spwmap, calwt)
# if old_hash in CalFrom._CalFromPool:
# del CalFrom._CalFromPool[old_hash]
(self.__caltype, self.__calwt, self.__gainfield, self.__gaintable,
self.__interp, self.__spwmap) = state
def __getnewargs__(self):
return (self.gaintable, self.gainfield, self.interp, self.spwmap,
self.caltype, self.calwt)
def __init__(self, *args, **kw):
pass
@property
def caltype(self):
return self.__caltype
@property
def calwt(self):
return self.__calwt
@property
def gainfield(self):
return self.__gainfield
@property
def gaintable(self):
return self.__gaintable
[docs] @staticmethod
def get_caltype_for_viscal(viscal):
s = viscal.upper()
for caltype, viscals in CalFrom.CALTYPE_TO_VISCAL.items():
if s in viscals:
return caltype
return 'unknown'
@property
def interp(self):
return self.__interp
@property
def spwmap(self):
return self.__spwmap
# def __eq__(self, other):
# return (self.gaintable == other.gaintable and
# self.gainfield == other.gainfield and
# self.interp == other.interp and
# self.spwmap == other.spwmap and
# self.calwt == other.calwt)
def __hash__(self):
return CalFrom._calc_hash(self.gaintable, self.gainfield, self.interp,
self.spwmap, self.calwt)
def __repr__(self):
return ('CalFrom(\'%s\', gainfield=\'%s\', interp=\'%s\', spwmap=%s, '
'caltype=\'%s\', calwt=%s)' %
(self.gaintable, self.gainfield, self.interp, self.spwmap,
self.caltype, self.calwt))
[docs]class CalToIdAdapter(object):
def __init__(self, context, calto):
self._context = context
self._calto = calto
@property
def antenna(self):
return [a.id for a in self.ms.get_antenna(self._calto.antenna)]
@property
def field(self):
fields = [f for f in self.ms.get_fields(task_arg=self._calto.field)]
# if the field names are unique, we can return field names. Otherwise,
# we fall back to field IDs.
all_field_names = [f.name for f in self.ms.get_fields()]
### Activate the following line for NRO ###
# return [f.id for f in fields]
if len(set(all_field_names)) == len(all_field_names):
return [f.name for f in fields]
else:
return [f.id for f in fields]
@property
def intent(self):
# return the intents present in the CalTo
return self._calto.intent
[docs] def get_field_intents(self, field_id, spw_id):
field = self._get_field(field_id)
field_intents = field.intents
spw = self._get_spw(spw_id)
spw_intents = spw.intents
user_intents = frozenset(self._calto.intent.split(','))
if self._calto.intent == '':
user_intents = field.intents
return user_intents & field_intents & spw_intents
@property
def ms(self):
return self._context.observing_run.get_ms(self._calto.vis)
@property
def spw(self):
return [spw.id for spw in self.ms.get_spectral_windows(
self._calto.spw, science_windows_only=False)]
def _get_field(self, field_id):
fields = self.ms.get_fields(task_arg=field_id)
if len(fields) != 1:
msg = 'Illegal field ID \'%s\' for vis \'%s\'' % (field_id,
self._calto.vis)
LOG.error(msg)
raise ValueError(msg)
return fields[0]
def _get_spw(self, spw_id):
spws = self.ms.get_spectral_windows(spw_id,
science_windows_only=False)
if len(spws) != 1:
msg = 'Illegal spw ID \'%s\' for vis \'%s\'' % (spw_id,
self._calto.vis)
LOG.error(msg)
raise ValueError(msg)
return spws[0]
def __repr__(self):
return ('CalToIdAdapter(ms=\'%s\', field=\'%s\', intent=\'%s\', '
'spw=%s, antenna=%s)' % (self.ms.name, self.field,
self.intent, self.spw, self.antenna))
# CalState extends defaultdict. For defaultdicts to be pickleable, their
# default factories must be defined at the module level.
def _antenna_dim(): return []
def _intent_dim(): return collections.defaultdict(_antenna_dim)
def _field_dim(): return collections.defaultdict(_intent_dim)
def _spw_dim(): return collections.defaultdict(_field_dim)
def _ms_dim(): return collections.defaultdict(_spw_dim)
[docs]class DictCalState(collections.defaultdict):
"""
DictCalState is a data structure used to map calibrations for all data
registered with the pipeline.
It is implemented as a multi-dimensional array indexed by data selection
parameters (ms, spw, field, intent, antenna), with the end value being a
list of CalFroms, representing the calibrations to be applied to that data
selection.
"""
def __init__(self, default_factory=_ms_dim):
super(DictCalState, self).__init__(default_factory)
self._removed = set()
def __reduce__(self): # optional, for pickle support
super_state = super(DictCalState, self).__reduce__()
return self.__class__, super_state[1], self._removed, super_state[3], super_state[4]
def __setstate__(self, state):
self._removed = state
[docs] def global_remove(self, calfrom):
"""
Mark a CalFrom as being removed from the calibration state. Rather than
iterating through the registered calibrations, this adds the CalFrom to
a set of object to be ignored. When the calibrations are subsequently
inspected, CalFroms marked as removed will be bypassed.
:param calfrom: the CalFrom to remove
:return:
"""
self._removed.add(calfrom)
[docs] def global_reactivate(self, calfroms):
"""
Reactivate a CalFrom that was marked as ignored through a call to
global_remove.
This will reactivate the CalFrom entry, making it appear at whatever
index in the CalApplications that it was originally registered, e.g.
if a CalFrom was 'deleted' via a call to global_remove and 3 more
CalFroms were added to the CalState, when the CalFrom is reactivated
it will appear in the original position - that is, before the 3
subsequent CalFroms, rather than appearing at the end of the list.
:param calfroms: the CalFroms to reactivate
:type calfroms: a set of CalFrom objects
:return: None
"""
LOG.trace('Globally reactivating %s CalFroms: %s',
len(calfroms), calfroms)
self._removed -= calfroms
[docs] def get_caltable(self, caltypes=None):
"""
Get the names of all caltables registered with this CalState.
If an optional caltypes argument is given, only caltables of the
requested type will be returned.
:param caltypes: Caltypes should be one or/a list of table
types known in CalFrom.CALTYPES.
:rtype: set of strings
"""
if caltypes is None:
caltypes = list(CalFrom.CALTYPES.keys())
if isinstance(caltypes, str):
caltypes = (caltypes,)
for c in caltypes:
assert c in CalFrom.CALTYPES
calfroms = (itertools.chain(*list(self.merged().values())))
return {cf.gaintable for cf in calfroms if cf.caltype in caltypes}
[docs] @staticmethod
def dictify(dd):
"""
Get a standard dictionary of the items in the tree.
"""
return dict([(k, (DictCalState.dictify(v) if isinstance(v, dict) else v))
for (k, v) in dd.items()])
[docs] def merged(self, hide_empty=False):
hashes = {}
flattened = self._flattened(hide_empty=hide_empty)
for (calto_tup, calfrom) in flattened:
# create a tuple, as lists are not hashable
calfrom_hash = tuple([hash(cf) for cf in calfrom])
if calfrom_hash not in hashes:
LOG.trace('Creating new CalFrom hash for %s', calfrom)
calto_args = CalToArgs(*[[x, ] for x in calto_tup])
hashes[calfrom_hash] = (calto_args, calfrom)
else:
calto_args = hashes[calfrom_hash][0]
for old_key, new_key in zip(calto_args, calto_tup):
if new_key not in old_key:
old_key.append(new_key)
for calto_tup, _ in hashes.values():
for l in calto_tup:
l.sort()
result = {}
for calto_args, calfrom in hashes.values():
for vis in calto_args.vis:
calto = CalTo(vis=vis,
spw=self._commafy(calto_args.spw),
field=self._commafy(calto_args.field),
intent=self._commafy(calto_args.intent),
antenna=self._commafy(calto_args.antenna))
result[calto] = calfrom
return result
def _commafy(self, l=[]):
return ','.join([str(i) for i in l])
def _flattened(self, hide_empty=True):
active = ((ct_tuple, [cf for cf in cf_list if cf not in self._removed])
for (ct_tuple, cf_list) in utils.flatten_dict(self))
if hide_empty:
return ((ct_tuple, cf_list) for ct_tuple, cf_list in active
if len(cf_list) is not 0)
return active
[docs] def as_applycal(self):
calapps = [CalApplication(k, v)
for k, v in self.merged(hide_empty=True).items()]
return '\n'.join([str(c) for c in calapps])
def __str__(self):
return self.as_applycal()
def __repr__(self):
return self.as_applycal()
# return 'CalState(%s)' % repr(CalState.dictify(self.merged))
[docs]class DictCalLibrary(object):
"""
CalLibrary is the root object for the pipeline calibration state.
"""
def __init__(self, context):
self._context = context
self._active = DictCalState()
self._applied = DictCalState()
[docs] def clear(self):
self._active = DictCalState()
self._applied = DictCalState()
def _add(self, calto, calfroms, calstate):
if not isinstance(calfroms, list):
calfroms = [calfroms]
calto = CalToIdAdapter(self._context, calto)
ms_name = calto.ms.name
for spw_id in calto.spw:
for field_id in calto.field:
for intent in calto.get_field_intents(field_id, spw_id):
for antenna_id in calto.antenna:
for cf in calfroms:
# now that we use immutable CalFroms, we don't
# need to deepcopy the object we are appending
calstate[ms_name][spw_id][field_id][intent][antenna_id].append(cf)
LOG.trace('Calstate after _add:\n%s', calstate.as_applycal())
def _calc_filename(self, filename=None):
if filename in ('', None):
filename = os.path.join(self._context.output_dir,
self._context.name + '.calstate')
return filename
def _export(self, calstate, filename=None):
filename = self._calc_filename(filename)
calapps = [CalApplication(k, v) for k, v in calstate.merged().items()]
with open(filename, 'w') as export_file:
for ca in calapps:
export_file.write(ca.as_applycal())
export_file.write('\n')
def _remove(self, calstate, calfrom, calto=None):
# If this is a global removal, as signified by the lack of a CalTo to
# give any target data selection, we can simply mark the CalFrom as
# removed
if calto is None:
calstate.global_remove(calfrom)
# But if this is a partial removal, go through the dictionary
# dimensions and remove it from the data selection specified by the
# CalTo
else:
if not isinstance(calfrom, list):
calfrom = [calfrom]
calto = CalToIdAdapter(self._context, calto)
ms_name = calto.ms.name
for spw_id in calto.spw:
for field_id in calto.field:
for intent in calto.get_field_intents(field_id, spw_id):
for antenna_id in calto.antenna:
current = calstate[ms_name][spw_id][field_id][intent][antenna_id]
for c in calfrom:
try:
current.remove(c)
except ValueError:
LOG.debug('%s not found in calstate', c)
if LOG.isEnabledFor(logging.TRACE):
LOG.trace('Calstate after _remove:\n%s', calstate.as_applycal())
[docs] def add(self, calto, calfroms):
# If we are adding a previously removed CalFrom back into a
# CalState, we assume that the user really want the previous
# CalFrom not to be ignored in future runs rather than adding
# a second entry for this CalFrom into the CalState.
if not isinstance(calfroms, collections.Iterable):
calfroms = [calfroms]
calfroms_to_reactivate = self._active._removed.intersection(set(calfroms))
self._active.global_reactivate(calfroms_to_reactivate)
calfroms_to_add = [cf for cf in calfroms if cf not in calfroms_to_reactivate]
if calfroms_to_add:
self._add(calto, calfroms_to_add, self._active)
@property
def active(self):
"""
CalState holding CalApplications to be (pre-)applied to the MS.
"""
return self._active
@property
def applied(self):
"""
CalState holding CalApplications that have been applied to the MS via
the pipeline applycal task.
"""
return self._applied
[docs] def export(self, filename=None):
"""
Export the pre-apply calibration state to disk.
The pre-apply calibrations held in the 'active' CalState will be
written to disk as a set of equivalent applycal calls.
"""
filename = self._calc_filename(filename)
LOG.info('Exporting current calibration state to %s', filename)
self._export(self._active, filename)
[docs] def export_applied(self, filename=None):
"""
Export the applied calibration state to disk.
The calibrations held in the 'applied' CalState will be written to
disk as a set of equivalent applycal calls.
"""
filename = self._calc_filename(filename)
LOG.info('Exporting applied calibration state to %s', filename)
self._export(self._applied, filename)
[docs] def get_calstate(self, calto, hide_null=True, ignore=None):
"""
Get the calibration state for a target data selection.
"""
if ignore is None:
ignore = []
# wrap the text-only CalTo in a CalToIdAdapter, which will parse the
# CalTo properties and give us the appropriate subtable IDs to iterate
# over
id_resolver = CalToIdAdapter(self._context, calto)
ms_name = id_resolver.ms.name
result = DictCalState()
for spw_id in id_resolver.spw:
for field_id in id_resolver.field:
for intent in id_resolver.get_field_intents(field_id, spw_id):
for antenna_id in id_resolver.antenna:
calfroms = self._active[ms_name][spw_id][field_id][intent][antenna_id]
# Make the hash function ignore the ignored properties
# by setting their value to the default (and equal)
# value.
calfrom_copies = [self._copy_calfrom(cf, ignore)
for cf in calfroms
if cf not in self._active._removed]
result[ms_name][spw_id][field_id][intent][antenna_id] = calfrom_copies
return result
def _copy_calfrom(self, to_copy, ignore=None):
if ignore is None:
ignore = []
calfrom_properties = ['caltype', 'calwt', 'gainfield', 'gaintable',
'interp', 'spwmap']
copied = {k: getattr(to_copy, k) for k in calfrom_properties
if k not in ignore}
return CalFrom(**copied)
[docs] def import_state(self, filename=None, append=False):
filename = self._calc_filename(filename)
LOG.info('Importing calibration state from %s' % filename)
calapps = []
with open(filename, 'r') as import_file:
for line in [l for l in import_file if l.startswith('applycal(')]:
calapp = CalApplication.from_export(line)
calapps.append(calapp)
if not append:
self._active = DictCalState()
for calapp in calapps:
LOG.debug('Adding %s' % calapp)
self.add(calapp.calto, calapp.calfrom)
LOG.info('Calibration state after import:\n'
'%s' % self.active.as_applycal())
[docs] def mark_as_applied(self, calto, calfrom):
self._remove(self._active, calfrom, calto)
self._add(calto, calfrom, self._applied)
LOG.debug('New calibration state:\n'
'%s' % self.active.as_applycal())
LOG.debug('Applied calibration state:\n'
'%s' % self.applied.as_applycal())
# CalLibrary based on interval trees -----------------------------------------
[docs]def contiguous_sequences(l):
"""
Group a sequence of numbers into contiguous groups
:param l: a sequence
:return: list of Intervals
"""
s = sorted([int(d) for d in l])
for _, g in itertools.groupby(enumerate(s), lambda i_x: i_x[0] - i_x[1]):
rng = list(map(operator.itemgetter(1), g))
yield rng
# intervals are not inclusive of the upper bound, hence the +1 on the right bound
sequence_to_range = lambda l: (l[0], l[-1] + 1)
[docs]def sequence_to_casa_range(seq):
def as_casa_range(seq):
size = len(seq)
if size is 0:
return ''
elif size is 1:
return '{}'.format(seq[0])
else:
return '{}~{}'.format(seq[0], seq[-1])
return (as_casa_range(seq) for seq in contiguous_sequences(seq))
[docs]class CalToIntervalAdapter(object):
def __init__(self, context, calto):
self._context = context
self._calto = calto
ms = context.observing_run.get_ms(calto.vis)
self.ms = ms
antenna_ids = (a.id for a in ms.get_antenna(calto.antenna))
self.antenna = [sequence_to_range(seq) for seq in contiguous_sequences(antenna_ids)]
field_ids = (f.id for f in ms.get_fields(task_arg=calto.field))
self.field = [sequence_to_range(seq) for seq in contiguous_sequences(field_ids)]
spw_ids = (spw.id for spw in ms.get_spectral_windows(self._calto.spw, science_windows_only=False))
self.spw = [sequence_to_range(seq) for seq in contiguous_sequences(spw_ids)]
id_to_intent = get_intent_id_map(ms)
intent_to_id = {v: i for i, v in id_to_intent.items()}
if self._calto.intent == '':
self.intent = [(0, len(intent_to_id))]
else:
str_intents = self._calto.intent.split(',')
# the conditional check for intent is required as task parameters may
# specify an intent that is not in the MS, such as CHECK.
intent_ids = (intent_to_id[intent] for intent in str_intents
if intent in intent_to_id)
self.intent = [sequence_to_range(seq) for seq in contiguous_sequences(intent_ids)]
def __str__(self):
return ('CalToIntervalAdapter(ms={!r}, field={!r}, intent={!r}, spw={!r}, antenna={!r})'.format(
os.path.basename(self._calto.vis), self.field, self.intent, self.spw, self.antenna))
def __repr__(self):
return 'CalToIntervalAdapter({!s}, {!s})'.format(self._context, self._calto)
[docs]def create_data_reducer(join):
"""
Return a function that creates a new TimestampedData object containing the
result of executing the given operation on two TimestampedData objects.
The use case for this function is actually quite simple: perform an
operation on two TimestampedData objects (add, subtract, etc.) and put the
result in a new TimestampedData object.
The resulting TimestampedData object has a creation time equal to that of
the oldest input object.
:param join: the function to call on the two input objects
:return:
"""
def m(td1, td2, join=join):
oldest = min(td1, td2)
newest = max(td1, td2)
return TimestampedData(oldest.time, join(oldest, newest))
return m
[docs]def merge_lists(join_fn=operator.add):
"""
Return a function that merge two lists by calling the input operation
on the two input arguments.
:param join_fn:
:return:
"""
def m(oldest, newest):
return join_fn(oldest.data, newest.data)
return m
[docs]def merge_intervaltrees(on_intersect):
"""
Return a function that merges two IntervalTrees, executing a function
on the intersecting Interval ranges in the resulting merged IntervalTree.
:param on_intersect: the function to call on overlapping Intervals
:return: function
"""
def m(oldest, newest):
union = oldest.data | newest.data
union.split_overlaps()
union.merge_equals(data_reducer=on_intersect)
return union
return m
[docs]def ranges(lst):
pos = (j - i for i, j in enumerate(lst))
t = 0
for i, els in itertools.groupby(pos):
l = len(list(els))
el = lst[t]
t += l
yield (el, el + l)
[docs]def safe_join(vals, separator=','):
return separator.join((str(o) for o in vals))
[docs]def merge_contiguous_intervals(tree):
"""
Merge contiguous Intervals with the same value into one Interval.
:param tree: an IntervalTree
:return: new IntervalTree with merged Intervals
"""
merged_tree = intervaltree.IntervalTree()
# sort the tree by the list values. This is a prerequisite of using the
# itertools.groupby function
data = sorted(tree, key=tsd_accessor)
# create groups of Intervals that have the same list values. These are the
# Intervals we can merge.
for l, g in itertools.groupby(data, tsd_accessor):
# create a tree for these Intervals with the same list values
candidate_tree = intervaltree.IntervalTree(g)
# expand the Interval ranges to a list of integers (e.g. 1,3,6,7,8),
# then find the contiguous ranges
sequence = list(itertools.chain(*[range(i.begin, i.end) for i in sorted(candidate_tree)]))
for begin, end in ranges(sequence):
vals = sorted(candidate_tree[begin:end], key=tsd_accessor)
# create a new Interval for the contiguous range but using an
# existing value, thus reusing the timestamp
merged_tree.add(intervaltree.Interval(begin, end, vals[0].data))
return merged_tree
# function to access the value of a TimestampedData inside an Interval
tsd_accessor = operator.attrgetter('data.data')
[docs]def defrag_interval_tree(tree):
"""
Condense an IntervalTree by consolidating fragmented entries with the same
value into contiguous Intervals.
:param tree:
:return:
"""
# if the intervals in this tree do not contain IntervalTrees, we're at the final
# branch - the branch with the list of CalApplications. The intervals in this
# branch can be merged
leaf_values = [tsd_accessor(interval) for interval in tree]
if not all((isinstance(v, intervaltree.IntervalTree) for v in leaf_values)):
return merge_contiguous_intervals(tree)
# otherwise call recursively
merged_tree = intervaltree.IntervalTree()
for interval in tree:
new_leaf = intervaltree.Interval(interval.begin,
interval.end,
defrag_interval_tree(tsd_accessor(interval)))
merged_tree.add(new_leaf)
return merged_tree
# this chain of functions defines how to add overlapping Intervals when adding
# IntervalTrees
intent_add = create_data_reducer(join=merge_lists(join_fn=operator.add))
field_add = create_data_reducer(join=merge_intervaltrees(intent_add))
spw_add = create_data_reducer(join=merge_intervaltrees(field_add))
ant_add = create_data_reducer(join=merge_intervaltrees(spw_add))
# this chain of functions defines how to subtract overlapping Intervals when
# subtracting IntervalTrees
intent_sub = create_data_reducer(join=merge_lists(join_fn=lambda x, y: [item for item in x if item not in y]))
field_sub = create_data_reducer(join=merge_intervaltrees(intent_sub))
spw_sub = create_data_reducer(join=merge_intervaltrees(field_sub))
ant_sub = create_data_reducer(join=merge_intervaltrees(spw_sub))
[docs]def interval_to_set(interval):
"""
Get the all the indexes covered by an Interval.
:param interval:
:return:
"""
return set(range(interval.begin, interval.end))
[docs]def get_id_to_intent_fn(id_to_intent):
"""
Return a function that can convert intent IDs to a string intent.
Takes a dict of dicts, first key mapping measurement set name and second
key mapping numeric intent ID to string intent for that MS, e.g.
{'a.ms': {0: 'PHASE', 1: 'BANDPASS'}
:param id_to_intent: dict of vis : intent ID : string intent
:return: set of intents
"""
def f(vis, intent_ids):
assert vis in id_to_intent
mapping = id_to_intent[vis]
# if the intent range spans all intents for this measurement set,
# transform it back to '' to indicate all intents
if all((i in intent_ids for i in mapping)):
return set('')
return set((mapping[i] for i in intent_ids))
return f
[docs]def get_id_to_field_fn(ms_to_id_to_field):
"""
Return a function that can convert field IDs to a field name.
Takes a dict of dicts, first key mapping measurement set name and second
key mapping numeric field ID to field name, eg.
{'a.ms': {0: 'field 1', 1: 'field 2'}
:param ms_to_id_to_field: dict of vis : field ID : field name
:return: set of field names (or field IDs if names are not unique)
"""
id_to_identifier = {}
for ms_name, id_to_field in ms_to_id_to_field.items():
counter = collections.Counter()
counter.update(list(id_to_field.values()))
# construct an id:name mapping using the ID for non-unique field names
d = {field_id: field_name if counter[field_name] == 1 else field_id
for field_id, field_name in id_to_field.items()}
id_to_identifier[ms_name] = d
def f(vis, field_ids):
assert vis in id_to_identifier
field_id_to_field_name = id_to_identifier[vis]
# if the field range spans all fields for this measurement set,
# transform it back to '' to indicate all fields
if all(i in field_ids for i in field_id_to_field_name):
return set('')
return set(field_id_to_field_name[i] for i in field_ids)
return f
[docs]def expand_interval(interval, calto_args, calto_fn):
"""
Convert an Interval into the equivalent list of (CalTo, [CalFrom..])
2-tuples.
This function is the partner function to expand_intervaltree. See the
documention for expand_intervaltree for more details on the argument
format for this function.
:param interval: the Interval to convert
:param calto_args: the list of (argument name, conversion function) 2-tuples
for the remaining dimensions
:param calto_fn: the partial CalToArgs application
:return: a list of (CalTo, [CalFrom..]) tuples
"""
data = tsd_accessor(interval)
numeric_ids = interval_to_set(interval)
arg_name, conversion_fn = calto_args[0]
processed = conversion_fn(numeric_ids)
kwargs = {arg_name: processed}
calto_fn = functools.partial(calto_fn, **kwargs)
if isinstance(data, intervaltree.IntervalTree):
return expand_intervaltree(data, calto_args[1:], calto_fn)
else:
# the return type is an iterable of 2-tuples
return ((calto_fn(), data),)
[docs]def expand_intervaltree(tree, convert_fns, calto_fn):
"""
Convert an IntervalTree into the equivalent list of (CalTo, [CalFrom..])
2-tuples.
The second argument for this function is a list of 2-tuples of (CalToArgs
constructor argument for this dimension, value conversion function for
this dimension). The conversion function takes in a set of integer indexes
and converts it to a suitable (probably more human-readable value) for that
dimension, e.g. a conversion from field ID to field name. So, for a
dimension that supplies the 'antenna' argument to CalToArgs and should
prefix 'DV' to each antenna index, the tuple for that dimension could be
('antenna', lambda id: {'DV%s' % i for i in field_ids}).
The third argument is the partially-applied CalToArgs constructor. A
CalToArgs needs a number of arguments (vis, field, spw, etc.), each of
which corresponds to a dimension of the IntervalTree and which must be
supplied at CalToArgs creation time. To achieve this while iterating
through the dimensions (when the constructor arguments are not fully
known), object creation is delayed by performing just a partial
application, adding the keyword for the current dimension to the partial
application. At the final leaf node, when all constructor arguments have
been partially applied, we can call the partial function and get the
CalToArgs.
:param tree: the IntervalTree to convert
:param convert_fns: the list of (argument name, conversion function) 2-tuples
for the remaining dimensions
:param calto_fn: the partial CalToArgs application
:return: a list of (CalTo, [CalFrom..]) tuples
"""
return (x
for interval in tree
for x in expand_interval(interval, convert_fns, calto_fn))
[docs]def expand_calstate_to_calapps(calstate: "IntervalCalState") -> List[Tuple[CalTo, List[CalFrom]]]:
"""
Convert an IntervalCalState into a list of (CalTo, [CalFrom..]) tuples.
:param calstate: the IntervalCalState to convert
:return: a list of 2-tuples, first element a Calto, second element a list
of CalFroms
"""
# get functions to map from integer IDs to field and intent for this MS
id_to_field_fn = get_id_to_field_fn(calstate.id_to_field)
id_to_intent_fn = get_id_to_intent_fn(calstate.id_to_intent)
calapps: List[Tuple[CalTo, List[CalFrom]]] = []
for vis in calstate:
# Set the vis argument for the CalToArgs constructor through partial
# application. The subsequent calls will set the other arguments for
# CalToArgs (field, intent, spw, etc.)
caltoarg_fn = functools.partial(CalToArgs, vis={vis})
# partially apply vis so that callees can call id-to-X functions
# directly rather than having to push the vis arg through the layers
intent_fn = functools.partial(id_to_intent_fn, vis)
field_fn = functools.partial(id_to_field_fn, vis)
# maps dimension order to the CalToArgs argument and value processing
# function for that dimension. We have to process values at this point
# while vis is still atomic, as once it's a set for the CalTo (as it
# justly needs to be to handle sessions) we can't determine vis to
# perform the mapping.
caltoarg_dimension = (('antenna', unit),
('spw', unit),
('field', field_fn),
('intent', intent_fn))
vis_tree = calstate[vis]
vis_calapps = expand_intervaltree(vis_tree, caltoarg_dimension, caltoarg_fn)
calapps.extend(vis_calapps)
return calapps
[docs]def consolidate_calibrations(all_my_calapps):
"""
Consolidate a list of (CalTo, [CalFrom..]) 2-tuples into a smaller set of
equivalent applications by consolidating their data selection arguments.
This function works by merging the data selections of CalTo objects that
have the same calibration application, as determined by the values and
data selection present in the CalFroms.
:param calapps: an iterable of (CalTo, [CalFrom..]) 2-tuples
:return: a list of (CalTo, [CalFrom..]) tuples
"""
# When faced with a large number of EBs, trying to merge calibrations
# across all MSes results in a huge number of iterations - most of them
# pointless as the caltables only apply to one MS. So, partition the
# calapps, grouping them by MS, and merge within these partitions and not
# across them.
vis_to_calapps = collections.defaultdict(list)
for calto, calfroms in all_my_calapps:
if len(calto.vis) > 1:
msg = 'Cannot handle calibrations that apply to multiple MSes'
raise ValueError(msg)
vis = tuple(calto.vis)[0]
vis_to_calapps[vis].append((calto, calfroms))
all_accepted = {}
for vis, calapps_for_vis in vis_to_calapps.items():
LOG.info('Consolidating calibrations for {}'.format(os.path.basename(vis)))
# dict mapping an object hash to the object itself:
# hash([CalFrom, ...]): [CalFrom, ...]
hash_to_calfroms = {}
# dict mapping from object hash to corresponding list of CalToArgs
hash_to_calto_args = collections.defaultdict(list)
# create our maps of hashes, which we need to test for overlapping data
# selections
for calto_args, calfroms in calapps_for_vis:
if not calfroms:
continue
# create a tuple, as lists are not hashable
hashable_calfroms = tuple(hash(cf) for cf in calfroms)
hash_to_calto_args[hashable_calfroms].append(calto_args)
if hashable_calfroms not in hash_to_calfroms:
hash_to_calfroms[hashable_calfroms] = calfroms
# dict that maps holds accepted data selections and their CalFroms
accepted = {}
for hashable_calfroms, calto_args in hash_to_calto_args.items():
# assemble the other data selections (the other CalToArgs) which we
# will use to search for conflicting data selections
other_data_selections = []
for v in [v for k, v in hash_to_calto_args.items() if k != hashable_calfroms]:
other_data_selections.extend(v)
for to_merge in calto_args:
if hashable_calfroms not in accepted:
# first time round for this calibration application, therefore it can always be added
# as there will be nothing to merge
accepted[hashable_calfroms] = [(copy.deepcopy(to_merge), hash_to_calfroms[hashable_calfroms])]
continue
for idx, (existing_calto, calfroms) in enumerate(accepted[hashable_calfroms]):
if not calfroms:
continue
proposed_calto = CalToArgs(*copy.deepcopy(existing_calto))
for proposed_values, to_merge_values in zip(proposed_calto, to_merge):
proposed_values.update(to_merge_values)
# if the merged data selection does not conflict with any of
# the explicitly registered data selections that require a
# different calibration application, then it is safe to add
# the merged data selection and discard the unmerged data
# selection
if not any((data_selection_contains(proposed_calto, other) for other in other_data_selections)):
if LOG.isEnabledFor(logging.TRACE):
LOG.trace('No conflicting data selection detected')
LOG.trace('Accepting merged data selection: {!s}'.format(proposed_calto))
LOG.trace('Discarding unmerged data selection: {!s}'.format(to_merge))
accepted[hashable_calfroms][idx] = (proposed_calto, hash_to_calfroms[hashable_calfroms])
break
else:
# we get here if all of the proposed merged data selections
# conflict with the data selection in hand. In this case, it
# should be added as it stands, completely unaltered.
if LOG.isEnabledFor(logging.TRACE):
LOG.trace('Merged data selection conflicts with other registrations')
LOG.trace('Abandoning proposed data selection: {!s}'.format(proposed_calto))
LOG.trace('Appending new unmerged data selection: {!s}'.format(to_merge))
unmergeable = (to_merge, hash_to_calfroms[hashable_calfroms])
accepted[hashable_calfroms].append(unmergeable)
all_accepted.update(accepted)
# dict values are lists, which we need to flatten into a single list
result = []
for l in all_accepted.values():
result.extend(l)
return result
[docs]def data_selection_contains(proposed, calto_args):
"""
Return True if one data selection is contained within another.
:param proposed: data selection 1
:type proposed: CalToArgs
:param calto_args: data selection 2
:type calto_args: CalToArgs
:return: True if data selection 2 is contained within data selection 1
"""
return all([not proposed.vis.isdisjoint(calto_args.vis),
not proposed.antenna.isdisjoint(calto_args.antenna),
not proposed.field.isdisjoint(calto_args.field),
not proposed.spw.isdisjoint(calto_args.spw),
not proposed.intent.isdisjoint(calto_args.intent)])
[docs]def expand_calstate(calstate):
"""
Convert an IntervalCalState into the equivalent consolidated list of
(CalTo, [CalFrom..]) 2-tuples.
This function is is the top-level entry point for converting a calibration
state to 2-tuples. It consolidates data selections and converts numeric
data selection IDs to friendly equivalents through downstream processing,
:param calstate: the IntervalCalState to convert
:return: a list of (CalTo, [CalFrom..]) tuples
"""
# step 1: convert to [(CalTo, [CalFrom..]), ..]
unmerged = expand_calstate_to_calapps(calstate)
# step 2: consolidate entries with identical calibrations
consolidated = consolidate_calibrations(unmerged)
# step 3: take the list of (CalToArgs, [CalFrom]) tuples, taking any
# CalToArgs whose vis property targets multiple MSes and dividing them
# into n entries each targeting a single MS. This keeps the export data
# format more readable as each entry targets a single measurement set.
per_ms = [(CalToArgs({vis}, *cta[1:]), calfroms)
for cta, calfroms in consolidated for vis in cta.vis]
# step 4: convert integer ranges in data selection to friendlier CASA range
# syntax, e.g. [1,2,3,4,6,8] => ['1~4','6','8']
casa_format = [(CalToArgs(vis=calto_args.vis,
antenna=sequence_to_casa_range(calto_args.antenna),
spw=sequence_to_casa_range(calto_args.spw),
field=calto_args.field,
intent=calto_args.intent), calfroms)
for calto_args, calfroms in per_ms]
# step 5: convert each iterable argument to a comma-separated string
return [(CalToArgs(*[safe_join(arg) for arg in calto_args]), calfroms)
for calto_args, calfroms in casa_format]
[docs]def get_min_max(l, keyfunc=None):
if keyfunc:
l = list(map(keyfunc, l))
# this function is used to specify Interval ranges, which are not
# inclusive of the upper bound - hence the +1.
return min(l), max(l) + 1
[docs]def create_interval_tree(a):
"""
Create an IntervalTree containing a set of Intervals.
The input argument used to create the Intervals is an iterable of
3-tuples, each 3-tuple defined as:
(interval start, interval end, function giving value for that interval).
:param a: the iterable of argument tuples
:return: IntervalTree
"""
intervals = (intervaltree.Interval(begin, end, data_fn())
for begin, end, data_fn in a)
return intervaltree.IntervalTree(intervals)
[docs]def create_interval_tree_nd(intervals, value_fn):
"""
Create a multidimensional IntervalTree. Each Interval within the
IntervalTree points to the next dimension, with the final Interval
containing the value given by calling value_fn.
:param intervals: a list of Interval lists, with range of the final
(deepest) first, ending with the range of the root dimension
:param value_fn: function that returns value for the final dimension
:return: an IntervalTree
"""
# wrapper to create TimestampedData objects with a fixed timestamp of now
tsd_now = functools.partial(TimestampedData, datetime.datetime.now())
# Intervals have to point to the next dimension, so we must create the
# dimensions in reverse order, starting with the deepest dimension.
final_tree = create_interval_tree([(begin, end, lambda: tsd_now(value_fn()))
for begin, end in intervals[0]])
root = final_tree
# the parent dimensions just link to their child dimensions, similar to a
# linked list
for current_dim_intervals in intervals[1:]:
dim_args = [(begin, end, lambda: tsd_now(root))
for begin, end in current_dim_intervals]
root = create_interval_tree(dim_args)
return root
[docs]def create_interval_tree_for_ms(ms):
"""
Create a new IntervalTree fitted to the dimensions of a measurement set.
This function creates a new IntervalTree with the size of the antenna,
spw, field and intent dimensions fitted to envelop of the input measurement
set.
:param ms:
:return: an IntervalTree
"""
id_getter = operator.attrgetter('id')
tree_intervals = [
[(0, len(ms.intents))],
[get_min_max(ms.fields, keyfunc=id_getter)],
[get_min_max(ms.spectral_windows, keyfunc=id_getter)],
[get_min_max(ms.antennas, keyfunc=id_getter)]
]
return create_interval_tree_nd(tree_intervals, list)
[docs]def trim(tree, ranges):
"""
Return an IntervalTree trimmed to the specified ranges.
Ranges are specified as tuples of (begin, end).
:param tree: the IntervalTree to trim
:param ranges: a list of range tuples
:return: the trimmed IntervalTree
"""
insertions = set()
for begin, end in ranges:
# locate Intervals overlapping the range, not just those completely
# contained within the range
overlapping = tree.search(begin, end, strict=False)
# truncate the Intervals to the range boundaries
truncated = {intervaltree.Interval(max(iv.begin, begin),
min(iv.end, end),
iv.data)
for iv in overlapping}
insertions.update(truncated)
return intervaltree.IntervalTree(insertions)
[docs]def trim_nd(tree, selection):
"""
Return an IntervalTree with each dimension trimmed to the specified
set of ranges.
The data selection for each dimension is specified as a sequence of
(begin, end) tuples; the data selection for the tree as a whole is a
sequence of these dimension sequences. For example, the data selection
[ [(1, 3)], [(0, 5), (7, 8)] ]
would select 1-3 from the first dimension and 0-5, and 7 from the
second dimension.
:param tree: the IntervalTree to trim
:param selection: the sequence of data selections for each dimension
:return:
"""
# print('tree={}\nselection={}'.format(tree, selection))
root = trim(tree, selection[0])
if len(selection) > 1:
# TimestampedData objects are immutable namedtuples, so to change the
# data they point to we must replace the whole Interval. These
# replacement Intervals are identical to those they replace except for
# the TimestampedData.data property, which is trimmed to the next set
# of dimensions
replacements = {
intervaltree.Interval(iv.begin,
iv.end,
TimestampedData(iv.data.time,
trim_nd(tsd_accessor(iv), selection[1:])))
for iv in root
}
# now remove the untrimmed Intervals and replace them with our trimmed
# versions
root.clear()
root.update(replacements)
return root
[docs]def get_intent_id_map(ms):
"""
Get the mapping of intent ID to string intent for a measurement set.
:param ms: the measurement set to analyse
:return: a dict of intent ID: intent
"""
# intents are sorted to ensure consistent ordering
return dict(enumerate(sorted(ms.intents)))
[docs]class IntervalCalState(object):
"""
CalState is a data structure used to map calibrations for all data
registered with the pipeline.
It is implemented as a multi-dimensional array indexed by data selection
parameters (ms, spw, field, intent, antenna), with the end value being a
list of CalFroms, representing the calibrations to be applied to that data
selection.
"""
def __init__(self):
self.data = {}
self.id_to_intent = {}
self.id_to_field = {}
self.shape = {}
[docs] @staticmethod
def from_calapplication(context, calto, calfroms):
if not isinstance(calfroms, list):
calfroms = [calfroms]
adapted = CalToIntervalAdapter(context, calto)
selection_intervals = [
adapted.intent,
adapted.field,
adapted.spw,
adapted.antenna
]
selection = create_interval_tree_nd(selection_intervals, lambda: calfroms)
calstate = IntervalCalState.create_from_context(context)
ms = context.observing_run.get_ms(calto.vis)
calstate.data[ms.name] = selection
calstate.data[ms.name] = trim_to_valid_data_selection(calstate, ms.name)[ms.name]
return calstate
[docs] @staticmethod
def create_from_context(context):
if LOG.isEnabledFor(logging.TRACE):
LOG.trace('Creating new CalLibrary from context')
# holds a mapping of numeric intent ID to string intent for each ms.
id_to_intent = {ms.name: get_intent_id_map(ms)
for ms in context.observing_run.measurement_sets}
# holds a mapping of numeric ID to field name for each ms.
id_to_field = {ms.name: {field.id: field.name for field in ms.fields}
for ms in context.observing_run.measurement_sets}
calstate = IntervalCalState()
calstate.id_to_intent.update(id_to_intent)
calstate.id_to_field.update(id_to_field)
interval_trees = {ms.name: create_interval_tree_for_ms(ms)
for ms in context.observing_run.measurement_sets}
calstate.data.update(interval_trees)
# the shape is never modified and hence can be shared between calstates
for ms in context.observing_run.measurement_sets:
calstate.shape[ms.name] = get_calstate_shape(ms)
calstate.data = trim_to_valid_data_selection(calstate)
return calstate
[docs] def clear(self):
for calstate in self.data.values():
calstate.clear()
# do NOT clear the id mapping dicts as without access to the context
# we have no way to repopulate them.
# self.id_to_intent.clear()
# self.id_to_field.clear()
[docs] def trimmed(self, context, calto):
"""
Return a copy of this IntervalCalState trimmed to the specified CalTo data selection.
:param calto:
:param selection_intervals:
:return:
"""
# wrap the text-only CalTo in a CalToIntervalAdapter, which will parse
# the CalTo properties and give us the appropriate subtable IDs to
# iterate over
adapted = CalToIntervalAdapter(context, calto)
# get the data selection as numeric IDs
selection_intervals = [
adapted.antenna,
adapted.spw,
adapted.field,
adapted.intent
]
# get a copy of this calstate trimmed to the CalTo data selection
vis = adapted.ms.name
# if the data has not been registered with the CalLibrary, create a
# new and empty calibration application and register it, thereby
# creating all the IntervalTrees necessary for the MS.
if vis not in self.data:
ms = context.observing_run.get_ms(vis)
calto = CalTo(vis=ms.name)
to_add = IntervalCalState.from_calapplication(context, calto, [])
self.__iadd__(to_add)
copied = copy.deepcopy(self.data[vis])
trimmed = trim_nd(copied, selection_intervals)
calstate = IntervalCalState()
calstate.data[vis] = trimmed
calstate.id_to_intent[vis] = self.id_to_intent[vis]
calstate.id_to_field[vis] = self.id_to_field[vis]
calstate.shape = self.shape
return calstate
[docs] def get_caltable(self, caltypes=None) -> Set[str]:
"""
Get the names of all caltables registered with this CalState.
If an optional caltypes argument is given, only caltables of the
requested type will be returned.
:param caltypes: Caltypes should be one or/a list of table
types known in CalFrom.CALTYPES.
:rtype: set of strings
"""
if caltypes is None:
caltypes = list(CalFrom.CALTYPES.keys())
if isinstance(caltypes, str):
caltypes = (caltypes,)
for c in caltypes:
assert c in CalFrom.CALTYPES
return {calfrom.gaintable for calfroms in self.merged().values()
for calfrom in calfroms
if calfrom.caltype in caltypes}
[docs] def merged(self, hide_empty=False):
calapps = expand_calstate(self)
if hide_empty:
calapps = [ca for ca in calapps if len(ca[1]) > 0]
# TODO dict is unnecessary. refactor all usages of this class to use
# the tuple
return dict(calapps)
[docs] def export_to_casa_callibrary(self, ms, callibfile):
calapps = (CalApplication(calto, calfroms)
for calto, calfroms in self.merged(hide_empty=True).items())
append = False
for calapp in calapps:
casa_intents = utils.to_CASA_intent(ms, calapp.intent)
applycaltocallib(callibfile, append=append, field=calapp.field, intent=casa_intents, spw=calapp.spw,
gaintable=calapp.gaintable, gainfield=calapp.gainfield, interp=calapp.interp,
spwmap=calapp.spwmap, calwt=calapp.calwt)
append = True
[docs] def as_applycal(self):
calapps = (CalApplication(calto, calfroms)
for calto, calfroms in self.merged(hide_empty=True).items())
return '\n'.join([str(c) for c in calapps])
def __str__(self):
return self.as_applycal()
def _combine(self, other, combine_fn):
"""
Get the union of this object combined with another IntervalCalState,
applying a function to any Intervals that overlap.
:param other: the other IntervalCalState
:param combine_fn: the combining function to apply
:return: IntervalCalState
"""
calstate = IntervalCalState()
# ensure that the other calstate is not considered equal to this
# calstate, even if they the values they hold are identical. This step
# is required so that all entries are added to in the union
# (my_root | other_root) operation, and ensures that arithmetic like
# 'calstate_x - calstate_x = 0' holds true.
marker = uuid.uuid4()
other_marked = set_calstate_marker(other, marker)
# copy the ID mapping and shape data across.
calstate.id_to_intent = self.id_to_intent
calstate.id_to_field = self.id_to_field
calstate.shape = self.shape
for vis, my_root in self.data.items():
if LOG.isEnabledFor(logging.TRACE):
LOG.trace('Combining callibrary entries for {}'.format(os.path.basename(vis)))
# adopt IntervalTrees present in just this object
if vis not in other_marked.data:
# TODO think: does this need to be a deep copy?
calstate.data[vis] = copy.deepcopy(self.data[vis])
continue
# get the union of IntervalTrees for MSes present in both objects
other_root = other_marked.data[vis]
union = my_root | other_root
union.split_overlaps()
union.merge_equals(data_reducer=combine_fn)
calstate.data[vis] = union
calstate.data = trim_to_valid_data_selection(calstate)
# Unmark the result calstate, thus eliminating any residual uuids
unmarked = set_calstate_marker(calstate, None)
return unmarked
def __add__(self, other):
calstate = self._combine(other, ant_add)
# also adopt IntervalTrees only present in the other object
for vis, other_root in other.data.items():
if vis not in self.data:
calstate[vis] = other_root
calstate.id_to_intent[vis] = other.id_to_intent[vis]
calstate.id_to_field[vis] = other.id_to_field[vis]
calstate.shape[vis] = other.shape[vis]
return calstate
def __iadd__(self, other):
sum_state = self + other
# adopt all properties from the added states
self.data = sum_state.data
self.id_to_field = sum_state.id_to_field
self.id_to_intent = sum_state.id_to_intent
self.shape = sum_state.shape
return self
def __sub__(self, other):
return self._combine(other, ant_sub)
def __getitem__(self, key):
return self.data[key]
def __setitem__(self, key, value):
self.data[key] = value
def __delitem__(self, key):
del self.data[key]
def __contains__(self, item):
return item in self.data
def __iter__(self):
return iter(self.data)
[docs]def fix_cycle0_data_selection(context, calstate):
# shortcut to minimise processing for data from Cycle 1 onwards.
if any(utils.get_epoch_as_datetime(ms.start_time) <= CYCLE_0_END_DATE
for ms in context.observing_run.measurement_sets):
return calstate
final_calstate = IntervalCalState.create_from_context(context)
# We can't trust Cycle 0 data intents. If this is Cycle 0 data we need
# to resolve the intents to fields and add them to the CalTo data
# selection to ensure that the correct data is selected.
for calto, calfroms in calstate.merged().items():
vis = calto.vis
ms = context.observing_run.get_ms(vis)
if utils.get_epoch_as_datetime(ms.start_time) > CYCLE_0_END_DATE:
final_calstate += IntervalCalState.from_calapplication(context, calto, calfroms)
continue
if calto.intent is not '':
fields_with_intent = ms.get_fields(task_arg=calto.field, intent=calto.intent)
field_names = {f.name for f in fields_with_intent}
if len(field_names) == len(fields_with_intent):
new_field_arg = ','.join(field_names)
else:
new_field_arg = ','.join([str(field.id) for field in fields_with_intent])
if new_field_arg != calto.field:
LOG.info('Rewriting data selection to work around mislabeled Cycle 0 data intents. '
'Old field selection: %r; new field selection: %r', calto.field, new_field_arg)
calto = CalTo(vis=calto.vis, field=new_field_arg, spw=calto.spw, antenna=calto.antenna,
intent=calto.intent)
to_add = IntervalCalState.from_calapplication(context, calto, calfroms)
final_calstate += to_add
return final_calstate
[docs]class IntervalCalLibrary(object):
"""
CalLibrary is the root object for the pipeline calibration state.
"""
def __init__(self, context):
self._context = context
self._active = IntervalCalState.create_from_context(context)
self._applied = IntervalCalState.create_from_context(context)
[docs] def clear(self):
self._active.clear()
self._applied.clear()
def _calc_filename(self, filename=None):
if filename in ('', None):
filename = os.path.join(self._context.output_dir,
self._context.name + '.calstate')
return filename
def _export(self, calstate, filename=None):
filename = self._calc_filename(filename)
calapps = [CalApplication(k, v) for k, v in calstate.merged().items()]
with open(filename, 'w') as export_file:
for ca in calapps:
export_file.write(ca.as_applycal())
export_file.write('\n')
[docs] def add(self, calto, calfroms):
to_add = IntervalCalState.from_calapplication(self._context, calto, calfroms)
self._active += to_add
if LOG.isEnabledFor(logging.TRACE):
LOG.trace('Calstate after _add:\n%s', self._active.as_applycal())
@property
def active(self):
"""
CalState holding CalApplications to be (pre-)applied to the MS.
"""
return self._active
@property
def applied(self):
"""
CalState holding CalApplications that have been applied to the MS via
the pipeline applycal task.
"""
return self._applied
[docs] def export(self, filename=None):
"""
Export the pre-apply calibration state to disk.
The pre-apply calibrations held in the 'active' CalState will be
written to disk as a set of equivalent applycal calls.
"""
filename = self._calc_filename(filename)
LOG.info('Exporting current calibration state to %s', filename)
self._export(self._active, filename)
[docs] def export_applied(self, filename=None):
"""
Export the applied calibration state to disk.
The calibrations held in the 'applied' CalState will be written to
disk as a set of equivalent applycal calls.
"""
filename = self._calc_filename(filename)
LOG.info('Exporting applied calibration state to %s', filename)
self._export(self._applied, filename)
[docs] def get_calstate(self, calto, ignore=None):
"""
Get the active calibration state for a target data selection.
:param calto: the data selection
:param ignore:
:return:
"""
if ignore is None:
ignore = []
trimmed = self.active.trimmed(self._context, calto)
# TODO replace with something like defrag_tree implementation
for tree in trimmed.data.values():
for antenna_interval in tree:
spw_tree = tsd_accessor(antenna_interval)
for spw_interval in spw_tree:
field_tree = tsd_accessor(spw_interval)
for field_interval in field_tree:
intent_tree = tsd_accessor(field_interval)
for intent_interval in intent_tree:
old_vals = tsd_accessor(intent_interval)
old_vals[:] = [self._copy_calfrom(cf, ignore)
for cf in old_vals]
return trimmed
def _copy_calfrom(self, to_copy, ignore=None):
if ignore is None:
ignore = []
calfrom_properties = ['caltype', 'calwt', 'gainfield', 'gaintable',
'interp', 'spwmap']
copied = {k: getattr(to_copy, k) for k in calfrom_properties
if k not in ignore}
return CalFrom(**copied)
[docs] def import_state(self, filename=None, append=False):
filename = self._calc_filename(filename)
LOG.info('Importing calibration state from %s' % filename)
calapps = []
with open(filename, 'r') as import_file:
for line in [l for l in import_file if l.startswith('applycal(')]:
calapp = CalApplication.from_export(line)
calapps.append(calapp)
if not append:
for _, calstate in self._active.data.items():
calstate.clear()
for calapp in calapps:
LOG.debug('Adding %s', calapp)
self.add(calapp.calto, calapp.calfrom)
LOG.info('Calibration state after import:\n%s', self.active.as_applycal())
[docs] def mark_as_applied(self, calto, calfrom):
application = IntervalCalState.from_calapplication(self._context, calto, calfrom)
self._active -= application
self._applied += application
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug('New calibration state:\n%s', self.active.as_applycal())
LOG.debug('Applied calibration state:\n%s', self.applied.as_applycal())
[docs] def unregister_calibrations(self, predicate_fn: Callable[[CalToArgs, CalFrom], bool]):
"""
Delete active calibrations that match the input predicate function.
Context
=======
Previously, calibration had to be removed by calling private callibrary
functions, e.g.,
calto = callibrary.CalTo(self.inputs.vis)
calfrom = callibrary.CalFrom(gaintable=ktypecaltable, interp='', calwt=False)
context.callibrary._remove(calto, calfrom, context.callibrary._active)
This function makes calibration removal a first-class public function
of the callibrary, and requires less knowledge of the calibration to remove.
Example usage
=============
The predicate function passed in by the caller defines which
calibrations should be unregistered. For example, Tsys caltable
removal can be achieved with the code below.
def match_tsys(calto, calfrom):
return calfrom.type == 'tsys'
callibrary.unregister_calibrations(match_tsys)
The pipeline inserts the task name into the caltable filename,
which can be used to unregister caltables generated by that task. For
example,
def match_task_caltable(calto, calfrom):
return 'hifa_bandpass' in calfrom.gaintable
context.callibrary.unregister_calibrations(match_task_caltable)
If you wanted to match calwt, interp, vis, etc. then that could be
done in the matcher function too, but if it's not necessary to
identify the caltable then it doesn't need to be tested in the
predicate function.
"""
to_remove = get_matching_calstate(self._context, self.active, predicate_fn)
self._active -= to_remove
# CalState = DictCalState
# CalLibrary = DictCalLibrary
CalState = IntervalCalState
CalLibrary = IntervalCalLibrary
[docs]class TimestampedData(collections.namedtuple('TimestampedDataBase', ['time', 'data', 'marker'])):
__slots__ = () # Saves memory, avoiding the need to create __dict__ for each interval
def __new__(cls, time, data, marker=None):
return super(TimestampedData, cls).__new__(cls, time, data, marker)
[docs] def cmp(self, other):
"""
Tells whether other sorts before, after or equal to this
Interval.
Sorting is by time then by data fields.
If data fields are not both sortable types, data fields are
compared alphabetically by type name.
:param other: Interval
:return: -1, 0, 1
:rtype: int
"""
if self.time != other.time:
return -1 if self.time < other.time else 1
try:
if self.data == other.data:
return 0
return -1 if self.data < other.data else 1
except TypeError:
s = type(self.data).__name__
o = type(other.data).__name__
if s == o:
return 0
return -1 if s < o else 1
def __lt__(self, other):
"""
Less than operator. Parrots __cmp__()
:param other: Interval or point
:return: True or False
:rtype: bool
"""
return self.cmp(other) < 0
def __gt__(self, other):
"""
Greater than operator. Parrots __cmp__()
:param other: Interval or point
:return: True or False
:rtype: bool
"""
return self.cmp(other) > 0
def __repr__(self):
"""
Executable string representation of this Interval.
:return: string representation
:rtype: str
"""
if self.marker is None:
return 'TSD({0}, {1})'.format(self.time, repr(self.data))
return 'TSD({0}, {1}, {2})'.format(self.time, repr(self.data), self.marker)
def __str__(self):
"""
String representation of this Interval.
:return: string representation
:rtype: str
"""
return 'TSD({0})'.format(repr(self.data))
def __eq__(self, other):
"""
Whether the begins equal, the ends equal, and the data fields
equal. Compare range_matches().
:param other: Interval
:return: True or False
:rtype: bool
"""
if not isinstance(other, TimestampedData):
return False
return self.time == other.time and \
self.data == other.data and \
self.marker == other.marker
[docs]def trim_to_valid_data_selection(calstate, vis=None):
"""
Trim an IntervalCalState to the shape of valid (present) data selections.
This is achieved by trimming Intervals for each dimension (antenna, spw,
field, intent) to exclude ranges for which no data is present.
See CAS-9415: CalLibrary needs a way to filter out calibration
applications for missing data selections
:param calstate: the calstate to shape
:param vis: name of the calstate to shape. If not defined, shape all.
:return: a new, shaped IntervalCalState
"""
if vis is None:
vislist = list(calstate.data.keys())
else:
vislist = [vis] if isinstance(vis, str) else vis
results = {}
for vis in vislist:
antenna_tree = calstate.data[vis]
new_root = intervaltree.IntervalTree()
for antenna_tuple in calstate.shape[vis]:
for antenna_ranges, spw_tuple in antenna_tuple:
for spw_ranges, field_tuple in spw_tuple:
for field_ranges, intent_ranges in field_tuple:
tree_intervals = (antenna_ranges, spw_ranges, field_ranges, intent_ranges)
# print('Shaping to {!r}'.format(tree_intervals))
new_root |= trim_nd(antenna_tree, tree_intervals)
results[vis] = new_root
return results
def _merge_intervals(unmerged):
"""
Merge adjacent Intervals (represented by the keys within the input dict)
that have identical values and output an IntervalTree-friendly tuple of
constructor arguments.
For example, a dict containing
{1: A, 2: B, 3:A, 4:A}
would be converted to
((((1, 2), (3, 5)), 'A'), (((2, 3),), 'B'))
:param unmerged: a dict mapping IDs to values
:return: tuple of constructor arguments ready for create_interval_tree_nd
"""
reversed = collections.defaultdict(set)
for k, v in unmerged.items():
reversed[v].add(k)
return tuple(sorted((tuple(sequence_to_range(seq) for seq in contiguous_sequences(v)), k)
for k, v in reversed.items()))
def _print_dimensions(calstate):
"""
Debugging function used to print the dimensions of an IntervalCalState.
:param calstate: the calstate to inspect
:return:
"""
for vis, antenna_tree in calstate.data.items():
for antenna_interval in antenna_tree.items():
antenna_ranges = (antenna_interval.begin, antenna_interval.end)
for spw_interval in antenna_interval.data.data:
spw_ranges = (spw_interval.begin, spw_interval.end)
for field_interval in spw_interval.data.data:
field_ranges = (field_interval.begin, field_interval.end)
for intent_interval in field_interval.data.data:
intent_ranges = (intent_interval.begin, intent_interval.end)
tree_intervals = (
os.path.basename(vis), antenna_ranges, spw_ranges, field_ranges, intent_ranges)
print('{!r}'.format(tree_intervals))
[docs]def set_calstate_marker(calstate, marker):
"""
Return a copy of a calstate, modified so that TimeStampedData objects in
the final leaf node are annotated with the given marker object.
Technical details:
CalFroms are flyweight objects, so two identical CalFroms have the same
hash. Identical hashes stop the IntervalTree union function from working
as expected: IntervalTrees are based on sets, and as such adding two
lists of CalFrom with identical hashes results in just one CalFrom list in
the final IntervalTree, when we actually *wanted* the duplicate to be
added.
This function is used to ensure that CalState arithmetic works as
expected. By changing the TimeStampedData marker and thus making the
hashes different, 'identical' calibrations can indeed be duplicated in the
IntervalTree union operation, and subsequently operated on in a
merge_equals step.
:param calstate: the calstate to modify
:param marker: the object to annotate calstates with
:return: annotated calstate
"""
calstate_copy = copy.deepcopy(calstate)
for vis, antenna_tree in calstate_copy.data.items():
for antenna_interval in antenna_tree.items():
for spw_interval in antenna_interval.data.data:
for field_interval in spw_interval.data.data:
to_remove = [i for i in field_interval.data.data]
to_add = []
intent_intervaltree = field_interval.data.data
for intent_interval in intent_intervaltree:
old_tsd = intent_interval.data
new_tsd = TimestampedData(time=old_tsd.time, data=old_tsd.data, marker=marker)
to_add.append(intervaltree.Interval(intent_interval.begin, intent_interval.end, new_tsd))
for interval in to_remove:
intent_intervaltree.remove(interval)
for interval in to_add:
intent_intervaltree.add(interval)
return calstate_copy
def _copy_calfrom(calfrom, **overrides):
"""
Copy a CalFrom, overwriting any CalFrom properties with the specified
override values.
For instance, to create a copy of a CalFrom with calwt set to True:
modified = _copy_calfrom(calfrom, calwt=True)
:param calapp: CalFrom to copy
:param overrides: kw/val pairs of CalFrom properties to override
:return: CalFrom instance
"""
new_kwargs = dict(gaintable=calfrom.gaintable, gainfield=calfrom.gainfield, interp=calfrom.interp,
spwmap=list(calfrom.spwmap), caltype=calfrom.caltype, calwt=calfrom.calwt)
new_kwargs.update(overrides)
return CalFrom(**new_kwargs)
def _copy_calto(calto, **overrides):
"""
Copy a CalTo, overwriting any CalFrom properties with the specified
override values.
For instance, to create a copy of a CalTo with spw set to 9:
modified = _copy_calto(calto, spw=9)
:param calapp: CalTo to copy
:param overrides: kw/val pairs of CalTo properties to override
:return: CalTo instance
"""
new_kwargs = dict(vis=calto.vis, field=calto.field, spw=calto.spw, antenna=calto.antenna, intent=calto.intent)
new_kwargs.update(overrides)
return CalTo(**new_kwargs)
[docs]def copy_calapplication(calapp, origin=None, **overrides):
"""
Copy a CalApplication, overwriting any CalTo or CalFrom values with the
given override values.
For instance, to create a copy of a CalApplication with the CalFrom.calwt
set to True and the CalTo.spw set to 9:
modified = copy_calapplication(calapp, calwt=True, spw=9)
:param calapp: CalApplication to copy
:param origin: origin to set, or None to copy the origin from calapp
:param overrides: kw/val pairs of calto/calfrom attributes to override
:return: CalApplication instance
"""
if origin is None:
origin = calapp.origin
calto_kw = ['vis', 'field', 'spw', 'antenna', 'intent']
calto_overrides = {k: v for k, v in overrides.items() if k in calto_kw}
calto = _copy_calto(calapp.calto, **calto_overrides)
calfrom_kw = ['gaintable', 'gainfield', 'interp', 'spwmap', 'caltype', 'calwt']
calfrom_overrides = {k: v for k, v in overrides.items() if k in calfrom_kw}
calfrom = [_copy_calfrom(calfrom, **calfrom_overrides) for calfrom in calapp.calfrom]
return CalApplication(calto, calfrom, origin=origin)
# def consolidate_calibrations(calapps):
# """
# Consolidate a list of (CalTo, [CalFrom..]) 2-tuples into a smaller set of
# equivalent applications by consolidating their data selection arguments.
#
# This function works by merging the data selections of CalTo objects that
# have the same calibration application, as determined by the values and
# data selection present in the CalFroms.
#
# :param calapps: an iterable of (CalTo, [CalFrom..]) 2-tuples
# :return: a list of (CalTo, [CalFrom..]) tuples
# """
#
# # dict mapping an object hash to the object itself:
# # hash([CalFrom, ...]): [CalFrom, ...]
# hash_to_calfroms = {}
# # dict mapping from object hash to corresponding list of CalToArgs
# hash_to_calto_args = collections.defaultdict(list)
#
# # create our maps of hashes, which we need to test for overlapping data
# # selections
# for calto_args, calfroms in calapps:
# # create a tuple, as lists are not hashable
# calfrom_hash = tuple([hash(cf) for cf in calfroms])
# hash_to_calto_args[calfrom_hash].append(calto_args)
#
# if calfrom_hash not in hash_to_calfroms:
# hash_to_calfroms[calfrom_hash] = calfroms
#
# LOG.info('Consolidating calibrations')
# # dict that maps holds accepted data selections and their CalFroms
# accepted = {}
# for calfrom_hash, calto_args in hash_to_calto_args.items():
# # assemble the other data selections (the other CalToArgs) which we
# # will use to search for conflicting data selections
# other_data_selections = []
# for v in [v for k, v in hash_to_calto_args.items() if k != calfrom_hash]:
# other_data_selections.extend(v)
#
# for to_merge in calto_args:
# if calfrom_hash not in accepted:
# # first time round for this calibration application, therefore it can always be added
# # as there will be nothing to merge
# accepted[calfrom_hash] = [(copy.deepcopy(to_merge), hash_to_calfroms[calfrom_hash])]
# continue
#
# for idx, (existing_calto, calfroms) in enumerate(accepted[calfrom_hash]):
# proposed_calto = CalToArgs(*copy.deepcopy(existing_calto))
#
# for proposed_values, to_merge_values in zip(proposed_calto, to_merge):
# proposed_values.update(to_merge_values)
#
# # if the merged data selection does not conflict with any of
# # the explicitly registered data selections that require a
# # different calibration application, then it is safe to add
# # the merged data selection and discard the unmerged data
# # selection
# if not any((data_selection_contains(proposed_calto, other) for other in other_data_selections)):
# LOG.trace('No conflicting data selection detected')
# LOG.trace('Accepting merged data selection: {!s}'.format(proposed_calto))
# LOG.trace('Discarding unmerged data selection: {!s}'.format(to_merge))
# accepted[calfrom_hash][idx] = (proposed_calto, hash_to_calfroms[calfrom_hash])
# break
#
# else:
# # we get here if all of the proposed merged data selections
# # conflict with the data selection in hand. In this case, it
# # should be added as it stands, completely unaltered.
# LOG.trace('Merged data selection conflicts with other registrations')
# LOG.trace('Abandoning proposed data selection: {!s}'.format(proposed_calto))
# LOG.trace('Appending new unmerged data selection: {!s}'.format(to_merge))
# unmergeable = (to_merge, hash_to_calfroms[calfrom_hash])
# accepted[calfrom_hash].append(unmergeable)
#
# # dict values are lists, which we need to flatten into a single list
# result = []
# for l in accepted.itervalues():
# result.extend(l)
# return result
[docs]@cachetools.cached(cachetools.LRUCache(50), key=operator.attrgetter('name'))
def get_calstate_shape(ms):
"""
Get an IntervalTree shaped to the dimensions of the given measurement set.
This function calculates the size of each metadata dimension (spw; intent;
field; antenna), creating and returning an IntervalTree shaped to match.
The output of this function is used to trim a calibration applied globally
in one or more dimensions to a valid data selection.
Output from this function is cached as it can take several seconds to
calculate the result, which is done repeatedly when importing a calstate
containing many entries.
Note: this assumes that shape of an MS never changes, which should be
true; the number of spws, fields, ants, etc. never changes.
:param ms: the MeasurementSet to analyse
:return: IntervalTree shaped to match valid data dimensions
"""
LOG.debug('Calculating callibrary shape for {}'.format(ms.basename))
# holds a mapping of numeric intent ID to string intent
id_to_intent = get_intent_id_map(ms)
# create map of observing intent to intent ID by inverting existing map
intent_to_id = {v: k for k, v in id_to_intent.items()}
# create interval tree. root branch is antenna
antenna_tree = create_interval_tree_for_ms(ms)
spw_shape = {}
for spw in ms.spectral_windows:
intents_for_field = {}
for field in ms.fields:
if spw in field.valid_spws:
# construct the list of observed intent IDs for this field
#
# we can't rely on field.intents as this property
# aggregates all intents across all spws, which may
# differ across spws when there are multiple tunings
#
# DON'T DO THIS!
# observed_intent_ids = (intent_to_id[i] for i in field.intents)
scans_for_field_and_spw = ms.get_scans(spw=spw.id, field=field.id)
observed_intent_ids = [intent_to_id[i]
for scan in scans_for_field_and_spw
for i in scan.intents]
# SD scans can have subscans, where each subscan observes a
# different field with different intent, e.g., TARGET alternating
# with REFERENCE. Non-SD data should have a single target per
# scan, so the following code should be a no-op.
subscan_fields = {scan_field for scan in scans_for_field_and_spw for scan_field in scan.fields}
if len(subscan_fields) > 1:
# unfortunately, we have to fall back to the field.intents
# method. Expect this to break for multituning SD EBs.
observed_intent_ids = (intent_to_id[i] for i in field.intents)
# convert the intent IDs to an IntervalTree-friendly range
# and record it against the field ID
intents_for_field[field.id] = tuple(
sequence_to_range(seq) for seq in contiguous_sequences(observed_intent_ids)
)
# merge adjacent field intervals that have identical values
spw_shape[spw.id] = _merge_intervals(intents_for_field)
# merge adjacent spw intervals that have identical values
spw_shape = _merge_intervals(spw_shape)
# assume that spws are observed by all antennas. Note the trailing comma to make it a tuple!
# the inner tuple is needed to convert the generator comprehension to objects
antenna_shape = (tuple((((interval.begin, interval.end),), spw_shape) for interval in antenna_tree),)
return antenna_shape
[docs]def get_matching_calstate(context: launcher.Context,
calstate: IntervalCalState,
predicate_fn: Callable[[CalToArgs, CalFrom], bool]) -> IntervalCalState:
"""
Return an IntervalCalState contain calibrations in the input
IntervalCalState that match the predicate function.
The use case for this function is to identify calibrations matching a
pattern so that those calibrations can be deleted or modified. For
instance, matching registered bandpass caltables so they can be removed
from the active CalState.
:param context: pipeline context (required to create IntervalCalState)
:param calstate: calibration state to inspect
:predicate_fn: matching function that returns True when the selection is
to be added to the output IntervalCalState
"""
expanded = expand_calstate_to_calapps(calstate)
matching = [IntervalCalState.from_calapplication(context, CalTo.from_caltoargs(caltoargs), calfrom)
for (caltoargs, calfroms) in expanded
for calfrom in calfroms
if predicate_fn(caltoargs, calfrom)]
consolidated = functools.reduce(operator.add, matching, IntervalCalState.create_from_context(context))
return consolidated