Source code for pipeline.hsd.heuristics.grouping2

"""Set of heuristics for data grouping."""
from numbers import Real
from typing import Dict, List, NewType, Sequence, Tuple, Union

import numpy as np

import pipeline.infrastructure.api as api
import pipeline.infrastructure as infrastructure
from pipeline.infrastructure import casa_tools

LOG = infrastructure.get_logger(__name__)

Angle = NewType('Angle', Union[float, int, Dict])


[docs]class GroupByPosition2(api.Heuristic): """Grouping by RA/DEC position."""
[docs] def calculate(self, ra: np.ndarray, dec: np.ndarray, r_combine: Angle, r_allowance: Angle) -> Tuple[Dict, List]: """Group data by RA/DEC position. Divides data into groups by their positions in two dimensional space which are given by ra and dec. Groups data within the circle with the radius of r_combine. Only position difference larger than r_allowance are regarded as significant. For r_combine and r_allowance, specified values are interpreted to be in degree unless their units are explicitly given. Args: ra: List of R.A. dec: List of DEC. r_combine: Data inside r_combine will be grouped together. r_allowance: Data inside r_allowance are assumed to be the same position. Returns: Two-tuple containing information on group membership (PosDict) and boundaries between groups (PosGap). PosDict is a dictionary whose keys are indices for ra and dec. Values of PosDict are the list which contains different value depending on whether the position specified by the index is reference data for the group or not. If k is reference index, PosDict[k] lists indices for group member ([ID1, ID2,..., IDN]). Otherwise, PosDict[k] is [-1, m] where m is the index to reference data. PosGap is a list of gaps in terms of array indices for ra and dec ([IDX1, IDX2,...,IDXN]). Length of PosGap is (number of groups) - 1. """ qa = casa_tools.quanta if isinstance(r_combine, dict): # r_combine should be quantity CombineRadius = qa.convert(r_combine, 'deg')['value'] else: CombineRadius = r_combine if isinstance(r_allowance, dict): # r_allowance should be quantity AllowanceRadius = qa.convert(r_allowance, 'deg')['value'] else: AllowanceRadius = r_allowance ThresholdAR = AllowanceRadius * AllowanceRadius CombineDiameter = 2.0 * CombineRadius # 2009/2/10 Quicker Method Nrows = len(ra) PosDict = {} SelectDict = {} MinRA = ra.min() MinDEC = dec.min() # Calculate the lattice position (sRA, sDEC) for each pointings # Create list of pointings (dictionary) for each lattice position for x in range(Nrows): sRA = int((ra[x] - MinRA) / CombineDiameter) sDEC = int((dec[x] - MinDEC) / CombineDiameter) if sRA not in SelectDict: SelectDict[sRA] = {} if sDEC not in SelectDict[sRA]: SelectDict[sRA][sDEC] = [x] else: SelectDict[sRA][sDEC].append(x) # Create PosDict # make a list of spectra inside each lattice grid # store the list in PosDict[i] where 'i' is the smallest row number in the list # Other spectra have a reference to 'i' LOG.debug('SelectDict.keys() : %s' % list(SelectDict.keys())) for sRA in SelectDict: LOG.debug('len(SelectDict[%s].keys()) : %s' % (sRA, len(list(SelectDict[sRA].keys())))) for sDEC in SelectDict[sRA]: PosDict[SelectDict[sRA][sDEC][0]] = SelectDict[sRA][sDEC] if len(SelectDict[sRA][sDEC]) != 1: for x in SelectDict[sRA][sDEC][1:]: PosDict[x] = [-1, SelectDict[sRA][sDEC][0]] del SelectDict # Calculate thresholds to determine gaps DeltaP = np.sqrt((ra[1:] - ra[:-1])**2.0 + (dec[1:] - dec[:-1])**2.0) DeltaQ = np.take(DeltaP, np.nonzero(DeltaP > ThresholdAR)[0]) if len(DeltaQ) != 0: ThresholdP = np.median(DeltaQ) * 10.0 else: ThresholdP = 0.0 LOG.info('threshold:%s deg' % ThresholdP) # List rows which distance from the previous position is larger than the threshold PosGap = [] for x in range(1, Nrows): if DeltaP[x - 1] > ThresholdP: PosGap.append(x) LOG.info('Position Gap %s deg at row=%d' % (DeltaP[x-1], x)) # Print information if len(PosGap) == 0: PosGapMsg = 'Found no position gap' else: PosGapMsg = 'Found %d position gap(s)' % len(PosGap) LOG.info(PosGapMsg) #print '\nPosGap', PosGap #print '\nPosDict', PosDict return PosDict, PosGap
[docs]class GroupByTime2(api.Heuristic): """Grouping by time sequence."""
[docs] def calculate(self, timebase: Sequence[Real], time_diff: Sequence[Real]) -> Tuple[List, List]: """Group data by time sequence. Divides data into groups by their difference (time_diff). Two groups are defined based on "small" and "large" gaps, which are internally computed by ThresholdForGroupByTime heuristic. The time_diff is generated from timebase in most of the cases. The timebase contains all time stamps and time_diff is created from selected time stamps in other case. Args: timebase: base list of time stamps for threshold estimation time_diff: difference from the previous time stamp Returns: Two-tuple containing information on group membership (TimeTable) and boundaries between groups (TimeGap). TimeTable is the "list-of-list" whose items are the set of indices for each group. TimeTable[0] is the groups separaged by "small" gap while TimeTable[1] is for groups separated by "large" gap. They are used for baseline subtraction (hsd_baseline) and subsequent flagging (hsd_blflag). TimeTable: [[[ismall00,...,ismall0M],[...],...,[ismallX0,...,ismallXN]], [[ilarge00,...,ilarge0P],[...],...,[ilargeY0,...,ilargeYQ]]] TimeTable[0]: separated by small gaps TimeTable[1]: separated by large gaps TimeGap is the list of indices which indicate boundaries for "small" and "large" gaps. These are used for plotting. TimeGap: [[rowX1, rowX2,...,rowXN], [rowY1, rowY2,...,rowYN]] TimeGap[0]: small gap TimeGap[1]: large gap """ LOG.info('Grouping by Time...') # Threshold for grouping h = ThresholdForGroupByTime() (Threshold1, Threshold2) = h(timebase) TimeTable = [[], []] SubTable1 = [0] SubTable2 = [0] TimeGap = [[], []] # Detect small and large time gaps for index in range(len(time_diff)): indexp1 = index + 1 if time_diff[index] <= Threshold1: SubTable1.append(indexp1) else: TimeTable[0].append(SubTable1) SubTable1 = [indexp1] TimeGap[0].append(indexp1) LOG.info('Small Time Gap: %s sec at row=%d' % (time_diff[index], indexp1)) if time_diff[index] <= Threshold2: SubTable2.append(indexp1) else: TimeTable[1].append(SubTable2) SubTable2 = [indexp1] TimeGap[1].append(indexp1) LOG.info('Large Time Gap: %s sec at row=%d' % (time_diff[index], indexp1)) if len(SubTable1) > 0: TimeTable[0].append(SubTable1) if len(SubTable2) > 0: TimeTable[1].append(SubTable2) del SubTable1, SubTable2 # print information if len(TimeGap[0]) == 0: TimeGapMsg = 'Found no time gap' LOG.info(TimeGapMsg) else: TimeGapMsg1 = 'Found %d small time gap(s)' % len(TimeGap[0]) TimeGapMsg2 = 'Found %d large time gap(s)' % len(TimeGap[1]) LOG.info(TimeGapMsg1) LOG.info(TimeGapMsg2) #print '\nTimeGap', TimeGap #print '\nTimeTable', TimeTable return TimeTable, TimeGap
# TimeGap is index # TimeTable[][0] is row, TimeTable[][1] is index
[docs]class ThresholdForGroupByTime(api.Heuristic): """Estimate thresholds for large and small time gaps."""
[docs] def calculate(self, timebase: Sequence[Real]) -> Tuple[List, List]: """Estimate thresholds for large and small time gaps. Estimate thresholds for large and small time gaps using base list of time stamps. Threshold for small time gap, denoted as Threshold1, is computed from a median value of nonzero time differences multiplied by five, i.e., dt = timebase[1:] - timebase[:-1] Threhold1 = 5 * np.median(dt[dt != 0]) where timebase is assumed to be np.ndarray. Threshold for large time gap, denoted as Threshold2, is computed from a median value of time differences larger than Threshold1 mutiplied by five, i.e., Threshold2 = 5 * np.median( dt[np.logical_and(dt != 0, dt > Threshold1)] ) Args: timebase: base list of time stamps for threshold estimation Returns: Two-tuple of threshold values for small and large time gaps, respectively. """ ArrayTime = np.array(timebase) # 2009/2/5 adapted for multi beam which assumes to have identical time stamps # identical time stamps are rejected before determining thresholds # DeltaT: difference from the previous time stamp DeltaT = ArrayTime[1:] - ArrayTime[:-1] DeltaT1 = np.take(DeltaT, np.nonzero(DeltaT)[0]) Threshold1 = np.median(DeltaT1) * 5.0 DeltaT2 = np.take(DeltaT1, np.nonzero(DeltaT1 > Threshold1)[0]) if len(DeltaT2) > 0: Threshold2 = np.median(DeltaT2) * 5.0 else: Threshold2 = Threshold1 # print information LOG.info('Threshold1 = %s sec' % Threshold1) LOG.info('Threshold2 = %s sec' % Threshold2) LOG.info('MaxDeltaT = %s sec' % DeltaT1.max()) LOG.info('MinDeltaT = %s sec' % DeltaT1.min()) return Threshold1, Threshold2
[docs]class MergeGapTables2(api.Heuristic): """Merge time gap and position gaps."""
[docs] def calculate(self, TimeGap: List, TimeTable: List, PosGap: List, tBEAM: Sequence[int]) -> Tuple[List, List]: """Merge time gap and position gaps. Merge time gap list (TimeGap) and position gap list (PosGap). TimeTable and TimeGap should be the first and the second elements of the return value of GroupByTime2 heuristic. Also, PosGap should be the second element of the return value of GroupByPosition2 heuristic. PosGap is merged into small TimeGap (TimeGap[0]). tBEAM is used to separate the data by beam for multi-beam data. Args: TimeTable: the first element of output from GroupByTime2 heuristic TimeGap: the second element of output from GroupByTime2 heuristic PosGap: the second element of output from GroupByPosition2() tBEAM: list of beam identifier. Returns: Two-tuple containing information on group membership (TimeTable) and boundaries between groups (TimeGap). TimeTable is the "list-of-list" whose items are the set of indices for each group. TimeTable[0] is the groups separaged by "small" gap while TimeTable[1] is for groups separated by "large" gap. They are used for baseline subtraction (hsd_baseline) and subsequent flagging (hsd_blflag). TimeTable: [[[ismall00,...,ismall0M],[...],...,[ismallX0,...,ismallXN]], [[ilarge00,...,ilarge0P],[...],...,[ilargeY0,...,ilargeYQ]]] TimeTable[0]: separated by small gaps TimeTable[1]: separated by large gaps TimeGap is the list of indices which indicate boundaries for "small" and "large" gaps. The "small" gap is a merged list of gaps for groups separated by small time gaps and the ones grouped by positions. These are used for plotting. TimeGap: [[rowX1, rowX2,...,rowXN], [rowY1, rowY2,...,rowYN]] TimeGap[0]: small gap TimeGap[1]: large gap """ LOG.info('Merging Position and Time Gap tables...') idxs = [] for i in range(len(TimeTable[0])): idxs += TimeTable[0][i] IDX = list(np.sort(np.array(idxs))) tmpGap = list(np.sort(np.array(TimeGap[0] + PosGap))) NewGap = [] if len(tmpGap) != 0: t = n = tmpGap[0] for n in tmpGap[1:]: if t != n and t in IDX: NewGap.append(t) t = n if n in IDX: NewGap.append(n) TimeGap[0] = NewGap SubTable1 = [] TimeTable[0] = [] for index in range(len(IDX)): n = IDX[index] if n in TimeGap[0]: TimeTable[0].append(SubTable1) SubTable1 = [n] LOG.info('Small Time Gap at row=%d' % n) else: SubTable1.append(n) if len(SubTable1) > 0: TimeTable[0].append(SubTable1) # 2009/2/6 Divide TimeTable in accordance with the Beam TimeTable2 = TimeTable[:] TimeTable = [[], []] for i in range(len(TimeTable2)): for index in range(len(TimeTable2[i])): #rows = TimeTable2[i][index][0] idxs = TimeTable2[i][index] BeamDict = {} for index2 in range(len(idxs)): #row = rows[index2] idx = idxs[index2] if tBEAM[idx] in BeamDict: #BeamDict[tBEAM[row]][0].append(row) BeamDict[tBEAM[idx]].append(idx) else: BeamDict[tBEAM[idx]] = [idx] BeamList = list(BeamDict.values()) for beam in BeamList: TimeTable[i].append(beam) #print TimeTable[0] del BeamDict, BeamList, TimeTable2 LOG.debug('TimeTable = %s' % (TimeTable)) #print '\nTimeGap', TimeGap #print '\nTimeTable', TimeTable return TimeTable, TimeGap