#*******************************************************************************
# ALMA - Atacama Large Millimeter Array
# Copyright (c) ATC - Astronomy Technology Center - Royal Observatory Edinburgh, 2011
# (in the framework of the ALMA collaboration).
# All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#*******************************************************************************
import numpy as np
import pipeline.infrastructure.api as api
# from asap.asaplinefind import linefinder
# from asap import _asap, scantable, rcParams
[docs]class HeuristicsLineFinder(api.Heuristic):
"""
"""
[docs] def calculate(self, spectrum, threshold=7.0, min_nchan=3, avg_limit=2, box_size=2, tweak=False, mask=[], edge=None):
_spectrum = np.array(spectrum)
if len(mask) == 0:
mask = np.ones(len(_spectrum), dtype=np.int)
else:
mask = np.array(mask, np.int)
if edge is not None:
if len(edge) == 1:
if edge[0] != 0:
mask[:edge[0]] = 0
mask[-edge[0]:] = 0
_edge = (edge[0], len(_spectrum)-edge[0])
else:
_edge = (0, len(_spectrum))
else:
mask[:edge[0]] = 0
if edge[1] != 0:
mask[-edge[1]:] = 0
_edge = (edge[0], len(_spectrum)-edge[1])
else:
_edge = (0, len(_spectrum))
indeces = np.arange(len(_spectrum))
previous_line_indeces = np.array([], np.int)
previous_mad = 1e6
iteration = 0
max_iteration = 10
while iteration <= max_iteration:
iteration += 1
iteration_mask = np.array(mask)
iteration_mask[previous_line_indeces] = 0
variances = np.abs(_spectrum - np.median(_spectrum[iteration_mask==1]))
good_variances = sorted(variances[iteration_mask == 1])
good_variances = good_variances[:int(0.8*len(good_variances))]
mad = np.median(good_variances)
#line_indeces = indeces[np.logical_and(mask==1, variances > 7*mad)]
line_indeces = indeces[np.logical_and(mask==1, variances > threshold*mad)]
if mad > previous_mad or list(line_indeces) == list(previous_line_indeces):
break
else:
previous_mad = mad
previous_line_indeces = np.array(line_indeces)
ranges = []
range_start = None
range_end = None
for i in previous_line_indeces:
if range_start is None:
range_start = i
range_end = i
elif i == range_end + 1:
range_end = i
# elif range_end - range_start + 1 > 2:
elif range_end - range_start + 1 > 1:
ranges += [range_start, range_end]
range_start = i
range_end = i
else:
range_start = i
range_end = i
if range_start is not None and (range_end - range_start + 1 > 2):
ranges += [range_start, range_end]
if tweak:
ranges = self.tweak_lines(_spectrum, ranges, _edge)
#return len(ranges)/2
return ranges
[docs] def tweak_lines(self, spectrum, ranges, edge, n_ignore=1):
"""
"""
med = np.median(spectrum)
mask = np.array(spectrum) >= med
for i in range(0, len(ranges), 2):
if spectrum[ranges[i]] > med:
# Emission Feature
Mask = True
else:
# Absorption Feature
Mask = False
ignore = 0
for j in range(ranges[i], edge[0]-1, -1):
if (spectrum[j]-spectrum[j+1] > 0) == Mask:
ignore += 1
if (mask[j] != Mask) or (ignore > n_ignore):
ranges[i] = j
break
ignore = 0
for j in range(ranges[i+1], edge[1]):
if (spectrum[j]-spectrum[j-1] > 0) == Mask:
ignore += 1
if (mask[j] != Mask) or (ignore > n_ignore):
ranges[i+1] = j
break
return ranges