import numpy
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.api as api
LOG = infrastructure.get_logger(__name__)
[docs]class FitOrderHeuristics(api.Heuristic):
"""
Determine fitting order from a set of spectral data.
"""
MaxDominantFreq = 15
[docs] def calculate(self, data, mask=None, edge=(0, 0)):
"""
Determine fitting order from a set of spectral data, data,
with masks for each spectral data, mask, and number of edge
channels to be excluded, edge.
First, manipulate each spectral data by the following procedure:
1) mask regions specified by mask and edge,
2) subtract average from spectral data,
3) compute one-dimensional discrete Fourier Transform.
Then, Fourier power spectrum is averaged and averaged power
spectrum is analyzed to determine optimal polynomial order
for input data array. The heuristics returns one representative
polynomial order per input data array.
data: two-dimensional data array with shape (nrow, nchan).
mask: list of mask regions. Value should be a list of
[[start0,end0],[start1,end1],...] for each spectrum.
[[-1,-1]] indicates no mask. Default is None.
edge: number of edge channels to be dropped. Default is (0,0).
"""
(nrow, nchan) = data.shape
effective_nchan = nchan - sum(edge)
power_spectrum = []
if mask is not None:
mask_maker = MaskMaker(nchan, mask, edge)
else:
mask_maker = MaskMakerNoLine(nchan, edge)
for irow in range(nrow):
spectrum = data[irow]
flag = mask_maker.get_mask(irow)
average = (spectrum * flag).sum() / float(flag.sum())
spectrum = (spectrum - average) * flag
# Apply FFT to the spectrum
power_spectrum.append(numpy.abs(numpy.fft.rfft(spectrum)))
# Average seems to be better than median
#power = numpy.median(power_spectrum, axis=0)
power = numpy.average(power_spectrum, axis=0)
max_freq = max(int(self.MaxDominantFreq * effective_nchan / 2048.0), 1)
# 2007/09/01 Absolute value of power should be taken into account
# If the power is low, it should be ignored
# Normalize the power
power2 = power / power.mean()
max_power = power2[:max_freq].max()
if max_power < 3.0:
poly_order = 1.0
elif max_power < 5.0:
poly_order = 1.5
elif max_power < 10.0:
poly_order = 2.0
else:
flag = False
for i in range(max_freq, -1, -1):
if power2[i] > 10.0:
break
if power2[i] > 5.0:
flag = True
if flag is True:
poly_order = float(max(2.0, i)) + 0.5
else:
poly_order = float(max(2.0, i))
# Finally, convert to polynomial order
#poly_order = int(poly_order * 3.0)
#poly_order = int(poly_order + 1) * 2.0 + 0.5)
poly_order = int(poly_order * 2.0 + 1.5)
return poly_order
[docs]class MaskMakerNoLine(object):
def __init__(self, nchan, edge):
self.flag = numpy.ones(nchan)
self.flag[:edge[0]] = 0
self.flag[(nchan-edge[1]):] = 0
[docs] def get_mask(self, row):
return self.flag
[docs]class MaskMaker(MaskMakerNoLine):
def __init__(self, nchan, lines, edge):
super(MaskMaker, self).__init__(nchan, edge)
self.lines = lines
[docs] def get_mask(self, row):
flag = self.flag.copy()
for line in self.lines[row]:
if line[0] != -1:
flag[line[0]:line[1]] = 0
return flag
[docs]class SwitchPolynomialWhenLargeMaskAtEdgeHeuristic(api.Heuristic):
[docs] def calculate(self, nchan, edge, num_pieces, masklist):
# fit function heuristics
# nchan: total number of channels
# nchan_segment: number of channels in one segment
# edge: number of channels from the edges to be excluded from the fit [C0, C1]
# mask: mask array (0->rejected, 1->adopted)
# masklist: list of fit ranges (included in the fit) [[C0, C1], [C2, C3], ...]
# nchan_edge: max number of consecutive masked channels from edges
# if nchan_edge >= nchan/2:
# fitfunc='poly'
# order=1
# elif nchan_edge >= nchan_segment:
# fitfunc='poly'
# order=2
# else:
# fitfunc='cspline'
if len(masklist) == 0:
# special case: all channels are excluded from the fit
nchan_edge0 = nchan
nchan_edge1 = nchan
else:
# number of masked edge channels: Left side
edge_mask0 = list(map(min, masklist))
assert edge[0] >= 0
nchan_edge0 = max(min(edge_mask0), edge[0]) if len(edge_mask0) > 0 else edge[0]
# number of masked edge channels: Right side
edge_mask1 = list(map(max, masklist))
assert edge[1] >= 0
nchan_edge1 = max(nchan - 1 - max(edge_mask1), edge[1]) if len(edge_mask1) > 0 else edge[1]
# merge result
nchan_edge = max(nchan_edge0, nchan_edge1)
nchan_segment = int(round(float(nchan) / num_pieces))
nchan_half = nchan // 2 + nchan % 2
nchan_quarter = nchan // 4 + (3 + nchan % 4) // 4
if nchan_edge >= nchan_half:
fitfunc = 'poly'
order = 1
elif nchan_edge >= nchan_quarter:
fitfunc = 'poly'
order = 2
else:
fitfunc = 'cspline'
order = 0 # not used
LOG.debug('DEBUGGING INFORMATION:')
LOG.debug('inclusive masklist={}'.format(masklist))
LOG.debug('edge = {}'.format(list(edge)))
LOG.debug('nchan_edge = {} (left {} right {})'.format(nchan_edge, nchan_edge0, nchan_edge1))
LOG.debug('nchan = {}, num_pieces = {} => nchan_segment = {}, nchan_half = {}'.format(nchan, num_pieces, nchan_segment, nchan_half))
LOG.debug('---> RESULT: fitfunc "{}" order "{}"'.format(fitfunc, order))
return fitfunc, order