import collections
import copy
import os.path
import numpy as np
import pipeline.infrastructure as infrastructure
LOG = infrastructure.get_logger(__name__)
[docs]class ResultAxis(object):
def __init__(self, name, units, data, channel_width=None):
self.name = name
self.units = units
self.data = data.copy()
if channel_width is None:
if len(data) > 1:
self.channel_width = data[1] - data[0]
else:
self.channel_width = 0
else:
self.channel_width = channel_width
[docs]class ResultBase(object):
@property
def fieldname(self):
if self._field_name is None:
return str(self.field_id)
else:
return self._field_name
@property
def description(self):
if self.time is None:
tstring = None
else:
# represent time sensibly relative to day start
t = self.time - 86400.0 * np.floor(self.time/86400.0)
h = int(np.floor(t/3600.0))
t -= h * 3600.0
m = int(np.floor(t/60.0))
t -= m * 60.0
s = int(np.floor(t))
tstring = '%sh%sm%ss' % (h, m, s)
temp = self.filename
if self.filename is not None:
temp = os.path.basename(self.filename)
fields = [('File', temp),
('Intent', self.intent),
('Field', self.fieldname),
('ID', self.field_id),
('SpW', self.spw),
('Pol', self.pol),
('Ant', self.ant),
('Time', tstring)]
return ' '.join('%s:%s' % (k) for k in fields if k[1] is not None)
[docs]class ImageResult(ResultBase):
def __init__(self, filename, data, datatype, axes, flag=None, nodata=None,
intent=None, field_id=None, field_name=None, spw=None,
pol=None, ant=None, units=None, time=None):
self.filename = filename
self.data = data.copy()
self.axes = axes
self.flag_reason_plane = np.zeros(np.shape(data), np.int)
self.flag_reason_key = {}
if flag is None:
self.flag = np.zeros(np.shape(self.data), np.bool)
else:
self.flag = flag.copy()
if nodata is None:
self.nodata = np.zeros(np.shape(self.data), np.bool)
else:
self.nodata = nodata.copy()
self.datatype = datatype
self.field_id = field_id
self._field_name = field_name
self.intent = intent
self.spw = spw
self.pol = pol
self.ant = ant
self.units = units
self.time = time
self.children = {}
[docs] def setflags(self, axisname=None, indices=None):
"""Set the flag array for specified axis values."""
if indices:
if self.axes[0].name == axisname:
self.flag[np.array(indices), :] = True
if self.axes[1].name == axisname:
self.flag[:, np.array(indices)] = True
[docs]class SpectrumResult(ResultBase):
def __init__(self, data, datatype, data_mad=None, axis=None, flag=None,
nodata=None, noisychannels=None, filename=None,
intent=None, field_id=None, field_name=None, spw=None,
pol=None, ant=None,
units=None, time=None, normalise=False):
self.filename = filename
if flag is None:
self.flag = np.zeros(np.shape(data), np.bool)
else:
self.flag = flag.copy()
valid_data = data[self.flag == False]
if len(valid_data) > 0:
self.median = np.median(valid_data)
else:
self.median = 0
if normalise and self.median > 0:
self.data = data / self.median
else:
self.data = data.copy()
if data_mad is None:
self.data_mad = np.zeros(np.shape(self.data))
else:
if normalise and self.median > 0:
self.data_mad = data_mad / self.median
else:
self.data_mad = data_mad.copy()
if axis is None:
self.axis = ResultAxis('channel', None,
np.arange(np.shape(self.data)[0]))
else:
self.axis = axis
if nodata is None:
self.nodata = np.zeros(np.shape(self.data), np.bool)
else:
self.nodata = nodata.copy()
if noisychannels is None:
self.noisychannels = np.zeros(np.shape(self.data), np.bool)
else:
self.noisychannels = np.array(noisychannels)
self.ant = ant
self.datatype = datatype
self.intent = intent
self.field_id = field_id
self._field_name = field_name
self.pol = pol
self.spw = spw
self.time = time
self.units = units
[docs]class QaResult(object):
def __init__(self):
self.overall_score = None
self.view_score = {}
self.view = collections.defaultdict(list)
self.flagging = []
self.spw = ''
[docs] def addview(self, description, viewresult):
self.view[description].append(viewresult)
[docs] def descriptions(self):
return list(self.view.keys())
[docs] def first(self, description):
return copy.deepcopy(self.view[description][0])
[docs] def flagcmds(self):
return self.flagging
[docs] def last(self, description):
return copy.deepcopy(self.view[description][-1])