Source code for pipeline.domain.antennaarray

import collections
import itertools
import math

import numpy

from casatasks.private import simutil

from pipeline.infrastructure import casa_tools
from . import measures

Baseline = collections.namedtuple('Baseline', 'antenna1 antenna2 length')


[docs]class AntennaArray(object): def __init__(self, name, position, antennas=None): self.__name = name self.__position = position if antennas is None: antennas = [] self.antennas = antennas def __repr__(self): return 'AntennaArray({0!r}, {1}, {2!r})'.format( self.__name, self.__position, self.antennas ) @property def name(self): return self.__name @property def position(self): return self.__position @property def elevation(self): """ Get the array elevation as a CASA quantity. """ return self.__position['m2'] @property def latitude(self): """ Get the array latitude as a CASA quantity. """ return self.__position['m1'] @property def longitude(self): """ Get the array longitude as a CASA quantity. """ return self.__position['m0'] @property def centre(self): datum = self.__position['refer'] if datum == 'ITRF': return self.__position qa = casa_tools.quanta longitude = qa.convert(self.longitude, 'rad') latitude = qa.convert(self.latitude, 'rad') elevation = qa.convert(self.elevation, 'm') s = simutil.simutil() return s.long2xyz(qa.getvalue(longitude)[0], qa.getvalue(latitude)[0], qa.getvalue(elevation)[0], datum) @property def min_baseline(self): return min(self.baselines, key=lambda b: b.length) @property def max_baseline(self): return max(self.baselines, key=lambda b: b.length) @property def baselines(self): qa = casa_tools.quanta def diff(ant1, ant2, attr): v1 = qa.getvalue(ant1.offset[attr])[0] v2 = qa.getvalue(ant2.offset[attr])[0] return v1-v2 baselines = [] for (ant1, ant2) in itertools.combinations(self.antennas, 2): raw_length = math.sqrt(diff(ant1, ant2, 'longitude offset')**2 + diff(ant1, ant2, 'latitude offset')**2 + diff(ant1, ant2, 'elevation offset')**2) domain_length = measures.Distance(raw_length, measures.DistanceUnits.METRE) baselines.append(Baseline(ant1, ant2, domain_length)) if len(baselines) is 0: zero_length = measures.Distance(0.0, measures.DistanceUnits.METRE) baselines.append(Baseline(self.antennas[0], self.antennas[0], zero_length)) return baselines
[docs] def get_offset(self, antenna): """ Get the offset of the given antenna from the centre of the array. """ # qa = casa_tools.quanta # longitude = qa.convert(self.longitude, 'rad') # latitude = qa.convert(self.latitude, 'rad') # elevation = qa.convert(self.elevation, 'm') # datum = self._position['refer'] # # (cx, cy, cz) = (qa.getvalue(longitude)[0], # qa.getvalue(latitude)[0], # qa.getvalue(elevation)[0]) # # s = simutil.simutil() # if datum != 'ITRF': # (cx, cy, cz) = s.long2xyz(cx, cy, cz, datum) # # ant_x = qa.getvalue(antenna.position['m0'])[0] # ant_y = qa.getvalue(antenna.position['m1'])[0] # ant_z = qa.getvalue(antenna.position['m2'])[0] # # # As of CASA 4.2, itrf2loc also returns the elevation offset, but we # # discard it. # xs, ys, _ = s.itrf2loc((ant_x,), (ant_y,), (ant_z,), cx, cy, cz) # # x_offset = measures.Distance(xs[0], measures.DistanceUnits.METRE) # y_offset = measures.Distance(ys[0], measures.DistanceUnits.METRE) dx = antenna.offset['longitude offset']['value'] dy = antenna.offset['latitude offset']['value'] x_offset = measures.Distance(dx, measures.DistanceUnits.METRE) y_offset = measures.Distance(dy, measures.DistanceUnits.METRE) return x_offset, y_offset
[docs] def get_baseline(self, antenna1, antenna2): try: int(antenna1) int(antenna2) attr_getter = lambda antenna: antenna.id except ValueError: attr_getter = lambda antenna: antenna.name matching = [b for b in self.baselines if attr_getter(b.antenna1) in (antenna1, antenna2) and attr_getter(b.antenna2) in (antenna1, antenna2)] if matching: return matching[0] return None
[docs] def add_antenna(self, antenna): self.antennas.append(antenna)
[docs] def get_antenna(self, id=None, name=None): if id is not None: l = [ant for ant in self.antennas if ant.id == id] if not l: raise IndexError('No antenna with ID {0}'.format(id)) return l[0] if name is not None: l = [ant for ant in self.antennas if ant.name == name] if not l: raise IndexError('No antenna with name {0}'.format(name)) return l[0] raise Exception('No id or name given to get_antenna')
@property def median_direction(self): """The median center direction for the array.""" # construct lists of the longitude and latitude of each antenna.. qt = casa_tools.quanta longs = [qt.getvalue(antenna.longitude) for antenna in self.antennas] lats = [qt.getvalue(antenna.latitude) for antenna in self.antennas] # .. and find the median of these lists med_lon = numpy.median(numpy.array(longs)) med_lat = numpy.median(numpy.array(lats)) # Construct and return a CASA direction using these median values. As # antenna positions are given in radians, the units of the median # direction is set to radians too. mt = casa_tools.measures return mt.direction(v0=qt.quantity(med_lon, 'rad'), v1=qt.quantity(med_lat, 'rad')) def __str__(self): names = ', '.join([antenna.name for antenna in self.antennas]) return 'AntennaArray({0})'.format(names)