import collections
import itertools
import math
import numpy
from casatasks.private import simutil
from pipeline.infrastructure import casa_tools
from . import measures
Baseline = collections.namedtuple('Baseline', 'antenna1 antenna2 length')
[docs]class AntennaArray(object):
def __init__(self, name, position, antennas=None):
self.__name = name
self.__position = position
if antennas is None:
antennas = []
self.antennas = antennas
def __repr__(self):
return 'AntennaArray({0!r}, {1}, {2!r})'.format(
self.__name,
self.__position,
self.antennas
)
@property
def name(self):
return self.__name
@property
def position(self):
return self.__position
@property
def elevation(self):
"""
Get the array elevation as a CASA quantity.
"""
return self.__position['m2']
@property
def latitude(self):
"""
Get the array latitude as a CASA quantity.
"""
return self.__position['m1']
@property
def longitude(self):
"""
Get the array longitude as a CASA quantity.
"""
return self.__position['m0']
@property
def centre(self):
datum = self.__position['refer']
if datum == 'ITRF':
return self.__position
qa = casa_tools.quanta
longitude = qa.convert(self.longitude, 'rad')
latitude = qa.convert(self.latitude, 'rad')
elevation = qa.convert(self.elevation, 'm')
s = simutil.simutil()
return s.long2xyz(qa.getvalue(longitude)[0],
qa.getvalue(latitude)[0],
qa.getvalue(elevation)[0],
datum)
@property
def min_baseline(self):
return min(self.baselines, key=lambda b: b.length)
@property
def max_baseline(self):
return max(self.baselines, key=lambda b: b.length)
@property
def baselines(self):
qa = casa_tools.quanta
def diff(ant1, ant2, attr):
v1 = qa.getvalue(ant1.offset[attr])[0]
v2 = qa.getvalue(ant2.offset[attr])[0]
return v1-v2
baselines = []
for (ant1, ant2) in itertools.combinations(self.antennas, 2):
raw_length = math.sqrt(diff(ant1, ant2, 'longitude offset')**2 +
diff(ant1, ant2, 'latitude offset')**2 +
diff(ant1, ant2, 'elevation offset')**2)
domain_length = measures.Distance(raw_length,
measures.DistanceUnits.METRE)
baselines.append(Baseline(ant1, ant2, domain_length))
if len(baselines) is 0:
zero_length = measures.Distance(0.0,
measures.DistanceUnits.METRE)
baselines.append(Baseline(self.antennas[0], self.antennas[0], zero_length))
return baselines
[docs] def get_offset(self, antenna):
"""
Get the offset of the given antenna from the centre of the array.
"""
# qa = casa_tools.quanta
# longitude = qa.convert(self.longitude, 'rad')
# latitude = qa.convert(self.latitude, 'rad')
# elevation = qa.convert(self.elevation, 'm')
# datum = self._position['refer']
#
# (cx, cy, cz) = (qa.getvalue(longitude)[0],
# qa.getvalue(latitude)[0],
# qa.getvalue(elevation)[0])
#
# s = simutil.simutil()
# if datum != 'ITRF':
# (cx, cy, cz) = s.long2xyz(cx, cy, cz, datum)
#
# ant_x = qa.getvalue(antenna.position['m0'])[0]
# ant_y = qa.getvalue(antenna.position['m1'])[0]
# ant_z = qa.getvalue(antenna.position['m2'])[0]
#
# # As of CASA 4.2, itrf2loc also returns the elevation offset, but we
# # discard it.
# xs, ys, _ = s.itrf2loc((ant_x,), (ant_y,), (ant_z,), cx, cy, cz)
#
# x_offset = measures.Distance(xs[0], measures.DistanceUnits.METRE)
# y_offset = measures.Distance(ys[0], measures.DistanceUnits.METRE)
dx = antenna.offset['longitude offset']['value']
dy = antenna.offset['latitude offset']['value']
x_offset = measures.Distance(dx, measures.DistanceUnits.METRE)
y_offset = measures.Distance(dy, measures.DistanceUnits.METRE)
return x_offset, y_offset
[docs] def get_baseline(self, antenna1, antenna2):
try:
int(antenna1)
int(antenna2)
attr_getter = lambda antenna: antenna.id
except ValueError:
attr_getter = lambda antenna: antenna.name
matching = [b for b in self.baselines
if attr_getter(b.antenna1) in (antenna1, antenna2)
and attr_getter(b.antenna2) in (antenna1, antenna2)]
if matching:
return matching[0]
return None
[docs] def add_antenna(self, antenna):
self.antennas.append(antenna)
[docs] def get_antenna(self, id=None, name=None):
if id is not None:
l = [ant for ant in self.antennas if ant.id == id]
if not l:
raise IndexError('No antenna with ID {0}'.format(id))
return l[0]
if name is not None:
l = [ant for ant in self.antennas if ant.name == name]
if not l:
raise IndexError('No antenna with name {0}'.format(name))
return l[0]
raise Exception('No id or name given to get_antenna')
@property
def median_direction(self):
"""The median center direction for the array."""
# construct lists of the longitude and latitude of each antenna..
qt = casa_tools.quanta
longs = [qt.getvalue(antenna.longitude) for antenna in self.antennas]
lats = [qt.getvalue(antenna.latitude) for antenna in self.antennas]
# .. and find the median of these lists
med_lon = numpy.median(numpy.array(longs))
med_lat = numpy.median(numpy.array(lats))
# Construct and return a CASA direction using these median values. As
# antenna positions are given in radians, the units of the median
# direction is set to radians too.
mt = casa_tools.measures
return mt.direction(v0=qt.quantity(med_lon, 'rad'),
v1=qt.quantity(med_lat, 'rad'))
def __str__(self):
names = ', '.join([antenna.name for antenna in self.antennas])
return 'AntennaArray({0})'.format(names)