import os
import numpy
import collections
import abc
import pipeline.infrastructure.api as api
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.logging as logging
# from pipeline.domain.datatable import DataTableImpl as DataTable
from pipeline.infrastructure import casa_tools
from . import fitorder
from . import fragmentation
LOG = infrastructure.get_logger(__name__)
[docs]def TRACE():
return LOG.isEnabledFor(logging.LOGGING_LEVELS['trace'])
[docs]def DEBUG():
return LOG.isEnabledFor(logging.LOGGING_LEVELS['debug'])
[docs]class BaselineParamKeys(object):
ROW = 'row'
POL = 'pol'
MASK = 'mask'
CLIPNITER = 'clipniter'
CLIPTHRESH = 'clipthresh'
USELF = 'use_linefinder'
LFTHRESH = 'thresh'
LEDGE = 'Ledge'
REDGE = 'Redge'
AVG_LIMIT = 'avg_limit'
FUNC = 'blfunc'
ORDER = 'order'
NPIECE = 'npiece'
NWAVE = 'nwave'
ORDERED_KEY = [ROW, POL, MASK, CLIPNITER, CLIPTHRESH, USELF, LFTHRESH,
LEDGE, REDGE, AVG_LIMIT, FUNC, ORDER, NPIECE, NWAVE]
BLP = BaselineParamKeys
# @sdutils.profiler
[docs]def write_blparam(fileobj, param):
param_values = collections.defaultdict(str)
for key in BLP.ORDERED_KEY:
if key in param:
param_values[key] = param[key]
line = ','.join(map(str, [param_values[k] for k in BLP.ORDERED_KEY]))
#line = ','.join((str(param[k]) if k in param.keys() else '' for k in BLP.ORDERED_KEY))
fileobj.write(line+'\n')
[docs]def as_maskstring(masklist):
return ';'.join(['%s~%s' % (x[0], x[1]) for x in masklist])
[docs]def no_switching(engine, nchan, edge, num_pieces, masklist):
return 'cspline', 0
[docs]def do_switching(engine, nchan, edge, num_pieces, masklist):
return engine(nchan, edge, num_pieces, masklist)
[docs]class BaselineFitParamConfig(api.Heuristic, metaclass=abc.ABCMeta):
"""
Generate/update BLParam file according to the input parameters.
"""
ApplicableDuration = 'raster' # 'raster' | 'subscan'
MaxPolynomialOrder = 'none' # 'none', 0, 1, 2,...
PolynomialOrder = 'automatic' # 'automatic', 0, 1, 2, ...
def __init__(self, switchpoly=True):
super(BaselineFitParamConfig, self).__init__()
self.paramdict = {}
self.heuristics_engine = fitorder.SwitchPolynomialWhenLargeMaskAtEdgeHeuristic()
if switchpoly == True:
self.switching_heuristic = do_switching
else:
self.switching_heuristic = no_switching
# readonly attributes
@property
def ClipCycle(self):
return 1
is_multi_vis_task = False
def __datacolumn(self, vis):
colname = ''
if isinstance(vis, str):
with casa_tools.TableReader(vis) as tb:
candidate_names = ['CORRECTED_DATA',
'DATA',
'FLOAT_DATA']
for name in candidate_names:
if name in tb.colnames():
colname = name
break
return colname
[docs] def calculate(self, datatable, ms, antenna_id, field_id, spw_id, fit_order, edge, deviation_mask, blparam):
"""
Generate/update BLParam file, which will be an input to sdbaseline,
according to the input parameters.
Inputs:
datatable -- DataTable instance
ms -- MeasurementSet domain object
antenna_id -- Antenna ID to process
field_id -- Field ID to process
spw_id -- Spw ID to process
fit_order -- Fiting order ('automatic' or number)
edge -- Number of edge channels to be excluded from the heuristics
(format: [L, R])
deviation_mask -- Deviation mask
blparam -- Name of the BLParam file
File contents will be updated by this heuristics
Returns:
Name of the BLParam file
"""
LOG.debug('Starting BaselineFitParamConfig')
fragmentation_heuristic = fragmentation.FragmentationHeuristics()
# fitting order
if fit_order == 'automatic':
# fit order heuristics
LOG.info('Baseline-Fitting order was automatically determined')
self.fitorder_heuristic = fitorder.FitOrderHeuristics()
else:
LOG.info('Baseline-Fitting order was fixed to {}'.format(fit_order))
self.fitorder_heuristic = lambda *args, **kwargs: fit_order #self.inputs.fit_order
vis = ms.name
if DEBUG() or TRACE():
LOG.debug('MS "{}" ant {} field {} spw {}'.format(os.path.basename(vis), antenna_id, field_id, spw_id))
nchan = ms.spectral_windows[spw_id].num_channels
data_desc = ms.get_data_description(spw=spw_id)
npol = data_desc.num_polarizations
# edge must be formatted to [L, R]
assert isinstance(edge, list) and len(edge) == 2, 'edge must be a list [L, R]. "{0}" was given.'.format(edge)
if DEBUG() or TRACE():
LOG.debug('nchan={nchan} edge={edge}'.format(nchan=nchan, edge=edge))
if self.ApplicableDuration == 'subscan':
timetable_index = 1
else:
timetable_index = 0
index_list_total = []
# prepare mask arrays
mask_array = numpy.ones(nchan, dtype=int)
mask_array[:edge[0]] = 0
mask_array[nchan-edge[1]:] = 0
# deviation mask
if DEBUG() or TRACE():
LOG.debug('Deviation mask for field {} antenna {} spw {}: {}'.format(
field_id, antenna_id, spw_id, deviation_mask))
if deviation_mask is not None:
for mask_range in deviation_mask:
mask_array[max(0, mask_range[0]):min(nchan, mask_range[1] + 1)] = 0
base_mask_array = mask_array.copy()
#LOG.info('base_mask_array = {}'.format(''.join(map(str, base_mask_array))))
time_table = datatable.get_timetable(antenna_id, spw_id, None, os.path.basename(vis), field_id)
member_list = time_table[timetable_index]
# working with spectral data in scantable
nrow_total = sum((len(x[0]) for x in member_list))
LOG.info('Calculating Baseline Fitting Parameter...')
LOG.info('Processing {} spectra...'.format(nrow_total))
#colname = self.inputs.colname
datacolumn = self.__datacolumn(vis)
if DEBUG() or TRACE():
LOG.debug('data column name is "{}"'.format(datacolumn))
# open blparam file (append mode)
with open(blparam, 'a') as blparamfileobj:
with casa_tools.TableReader(vis) as tb:
for y in range(len(member_list)):
rows = member_list[y][0]
idxs = member_list[y][1]
#spectra = numpy.fromiter((tb.getcell(colname,row)
# for row in rows),
# dtype=numpy.float64)
#tsel = tb.query('ROWNUMBER() IN [%s]'%((','.join(map(str, rows)))), style='python')
#spectra = tsel.getcol()
#tsel.close()
#spectra = numpy.asarray([tb.getcell(colname, row).real for row in rows])
spectra = numpy.zeros((len(rows), npol, nchan,), dtype=numpy.float32)
for (i, row) in enumerate(rows):
spectra[i] = tb.getcell(datacolumn, row).real
#get_mask_from_flagtra: 1 valid 0 invalid
#arg for mask_to_masklist: 0 valid 1 invalid
#flaglist = [self._mask_to_masklist(-sdutils.get_mask_from_flagtra(tb.getcell('FLAGTRA', row))+1 )
flaglist = [self._mask_to_masklist(tb.getcell('FLAG', row).astype(int))
for row in rows]
#LOG.trace("Flag Mask = %s" % str(flaglist))
spectra[:, :edge[0], :] = 0.0
spectra[:, nchan-edge[1]:, :] = 0.0
# here we assume that masklist is polarization-independent
# (this is because that line detection/validation process accumulates
# polarization components together
masklist = [datatable.getcell('MASKLIST', idx) for idx in idxs]
#masklist = [datatable.getcell('MASKLIST',idxs[i]) + flaglist[i]
# for i in range(len(idxs))]
#LOG.debug('DONE {}'.format(y))
npol = spectra.shape[1]
for pol in range(npol):
# fit order determination
polyorder = self.fitorder_heuristic(
spectra[:, pol, :], [list(masklist[i]) + flaglist[i][pol] for i in range(len(idxs))], edge)
#del spectra
if fit_order == 'automatic' and self.MaxPolynomialOrder != 'none':
polyorder = min(polyorder, self.MaxPolynomialOrder)
#LOG.debug('time group {} pol {}: fitting order={}'.format(
# y, pol, polyorder))
# calculate fragmentation
(fragment, nwindow, win_polyorder) = fragmentation_heuristic(polyorder, nchan, edge)
nrow = len(rows)
if DEBUG() or TRACE():
LOG.debug('nrow = {}'.format(nrow))
LOG.debug('len(idxs) = {}'.format(len(idxs)))
for i in range(nrow):
row = rows[i]
idx = idxs[i]
if TRACE():
LOG.trace('===== Processing at row = {} ====='.format(row))
#nochange = datatable.getcell('NOCHANGE',idx)
#LOG.trace('row = %s, Flag = %s'%(row, nochange))
# mask lines
maxwidth = 1
# _masklist = masklist[i]
_masklist = list(masklist[i]) + flaglist[i][pol]
#LOG.info('_masklist = {}'.format(_masklist))
#LOG.info('masklist[{}] = {}'.format(i, masklist[i]))
#LOG.info('flaglist[{}][{}] = {}'.format(i, pol, flaglist[i][pol]))
#LOG.info('FLAG[{}][{}] = {}'.format(i, pol, ''.join(map(str, numpy.array(tb.getcell('FLAG', row)[pol], dtype=numpy.uint8)))))
for [chan0, chan1] in _masklist:
if chan1 - chan0 >= maxwidth:
maxwidth = int((chan1 - chan0 + 1) / 1.4)
# allowance in Process3 is 1/5:
# (1 + 1/5 + 1/5)^(-1) = (5/7)^(-1)
# = 7/5 = 1.4
max_polyorder = int((nchan - sum(edge)) // maxwidth + 1)
if TRACE():
LOG.trace('Masked Region from previous processes = {}'.format(
_masklist))
LOG.trace('edge parameters= {}'.format(edge))
LOG.trace('Polynomial order = {} Max Polynomial order = {}'.format(polyorder, max_polyorder))
# fitting
polyorder = min(polyorder, max_polyorder)
mask_array[:] = base_mask_array
#LOG.info('mask_array = {}'.format(''.join(map(str, mask_array))))
#irow = len(row_list_total)+len(row_list)
#irow = len(index_list_total) + i
irow = row
param = self._calc_baseline_param(irow, pol, polyorder, nchan, 0, edge, _masklist,
win_polyorder, fragment, nwindow, mask_array)
# definition of masklist differs in pipeline and ASAP
# (masklist = [a, b+1] in pipeline masks a channel range a ~ b-1)
param[BLP.MASK] = [[start, end-1] for [start, end] in param[BLP.MASK]]
param[BLP.MASK] = as_maskstring(param[BLP.MASK])
if TRACE():
LOG.trace('Row {}: param={}'.format(row, param))
write_blparam(blparamfileobj, param)
# MS rows contain npol spectra
if pol == 0:
index_list_total.extend(idxs)
return blparam
#@sdutils.profiler
def _calc_baseline_param(self, row_idx, pol, polyorder, nchan, modification, edge, masklist, win_polyorder,
fragment, nwindow, mask):
# Create mask for line protection
nchan_without_edge = nchan - sum(edge)
#LOG.info('__ mask (before) = {}'.format(''.join(map(str, mask))))
if isinstance(masklist, (list, numpy.ndarray)):
for [m0, m1] in masklist:
mask[max(0, m0):min(nchan, m1 + 1)] = 0
else:
LOG.critical('Invalid masklist')
#LOG.info('__ mask (after) = {}'.format(''.join(map(str, mask))))
num_mask = int(nchan_without_edge - numpy.sum(mask[edge[0]:nchan-edge[1]] * 1.0))
# here meaning of "masklist" is changed
# masklist: list of channel ranges to be *excluded* from the fit
# masklist_all: list of channel ranges to be *included* in the fit
masklist_all = self.__mask_to_masklist(mask)
#LOG.info('__ masklist (before)= {}'.format(masklist))
#LOG.info('__ masklist (after) = {}'.format(masklist_all))
if TRACE():
LOG.trace('nchan_without_edge, num_mask, diff={}, {}'.format(
nchan_without_edge, num_mask))
outdata = self._get_param(row_idx, pol, polyorder, nchan, mask, edge, nchan_without_edge, num_mask, fragment,
nwindow, win_polyorder, masklist_all)
if TRACE():
LOG.trace('outdata={}'.format(outdata))
return outdata
def _mask_to_masklist(self, mask):
return [self.__mask_to_masklist(m) for m in mask]
def __mask_to_masklist(self, mask):
"""
Converts mask array to masklist
Resulting masklist is a list of channel ranges whose values are 1
Argument
mask : an array of channel mask in values 0 or 1
"""
# get indices of clump boundaries
idx = (mask[1:] ^ mask[:-1]).nonzero()
idx = (idx[0] + 1)
# idx now contains pairs of start-end indices, edges need handling
# depending on first and last mask value
if mask[0]:
if len(idx) == 0:
return [[0, len(mask)]]
r = [[0, idx[0]]]
if len(idx) % 2 == 1:
r.extend(idx[1:].reshape(-1, 2).tolist())
else:
r.extend(idx[1:-1].reshape(-1, 2).tolist())
else:
if len(idx) == 0:
return []
if len(idx) % 2 == 1:
r = (idx[:-1].reshape(-1, 2).tolist())
else:
r = (idx.reshape(-1, 2).tolist())
if mask[-1]:
r.append([idx[-1], len(mask)])
return r
@abc.abstractmethod
def _get_param(self, idx, pol, polyorder, nchan, mask, edge, nchan_without_edge, nchan_masked, fragment, nwindow,
win_polyorder, masklist):
raise NotImplementedError
[docs]class CubicSplineFitParamConfig(BaselineFitParamConfig):
def __init__(self, switchpoly=True):
super(CubicSplineFitParamConfig, self).__init__(switchpoly)
# constant stuff
#self.paramdict[BLP.FUNC] = 'cspline'
self.paramdict[BLP.CLIPNITER] = self.ClipCycle
self.paramdict[BLP.CLIPTHRESH] = 5.0
def _get_param(self, idx, pol, polyorder, nchan, mask, edge, nchan_without_edge, nchan_masked, fragment, nwindow,
win_polyorder, masklist):
num_nomask = nchan_without_edge - nchan_masked
num_pieces = max(int(min(polyorder * num_nomask / float(nchan_without_edge) + 0.5, 0.1 * num_nomask)), 1)
if TRACE():
LOG.trace('Cubic Spline Fit: Number of Sections = {}'.format(num_pieces))
self.paramdict[BLP.ROW] = idx
self.paramdict[BLP.POL] = pol
self.paramdict[BLP.MASK] = masklist
self.paramdict[BLP.NPIECE] = num_pieces
fitfunc, order = self.switching_heuristic(
self.heuristics_engine,
nchan,
edge,
num_pieces,
masklist
)
self.paramdict[BLP.FUNC] = fitfunc
self.paramdict[BLP.ORDER] = order
return self.paramdict