import decimal
import itertools
import operator
import numpy
import pipeline.infrastructure as infrastructure
from pipeline.infrastructure import casa_tools
from . import measures
LOG = infrastructure.get_logger(__name__)
[docs]class ArithmeticProgression(object):
"""
A representation of an arithmetic progression that can generate sequence
elements on demand.
"""
__slots__ = ('start', 'delta', 'num_terms')
def __getstate__(self):
return self.start, self.delta, self.num_terms
def __setstate__(self, state):
self.start, self.delta, self.num_terms = state
def __init__(self, start, delta, num_terms):
self.start = start
self.delta = delta
self.num_terms = num_terms
def __iter__(self):
g = itertools.count(self.start, self.delta)
return itertools.islice(g, 0, self.num_terms)
def __len__(self):
return self.num_terms
def __getitem__(self, index):
if abs(index) >= self.num_terms:
raise IndexError
if index < 0:
index += self.num_terms
return self.start + index * self.delta
[docs]def compress(values):
"""
Compress (if possible) a sequence of values.
If the numbers in the given list constitute an arithmetic progression,
return an ArithmeticProgression object that summarises it as such. If
the list cannot be summarised as a simple arithmetic progression,
return the list as given.
"""
deltas = set(numpy.diff(values))
if len(deltas) is 1:
delta = deltas.pop()
return ArithmeticProgression(values[0], delta, len(values))
else:
return values
[docs]class ChannelList(object):
"""
A container/generator for Channel objects.
A spectral window can contain thousands of channels. Rather than store all of
these objects, a ChannelList generates and returns them lazily, on-demand.
"""
def __init__(self, chan_freqs, chan_widths, effbw):
assert len(chan_freqs) == len(chan_widths) == len(effbw)
self.chan_freqs = chan_freqs
self.chan_widths = chan_widths
self.chan_effbws = effbw
def __iter__(self):
raw_channel_data = list(zip(self.chan_freqs, self.chan_widths, self.chan_effbws))
for chan_centre, chan_width, chan_effective_bw in raw_channel_data:
yield self.__create_channel(chan_centre, chan_width, chan_effective_bw)
def __len__(self):
return len(self.chan_freqs)
def __getitem__(self, index):
return self.__create_channel(self.chan_freqs[index],
self.chan_widths[index],
self.chan_effbws[index])
def __create_channel(self, centre, width, effective_bw):
dec_centre = decimal.Decimal(centre)
dec_width = decimal.Decimal(width)
# The abs() call is not strictly necessary as Channel extends
# FrequencyRange, whose .set() method swaps low and high when
# low > high.
delta = abs(dec_width) / decimal.Decimal(2)
f_lo = measures.Frequency(dec_centre - delta,
measures.FrequencyUnits.HERTZ)
f_hi = measures.Frequency(dec_centre + delta,
measures.FrequencyUnits.HERTZ)
f_bw = measures.Frequency(effective_bw,
measures.FrequencyUnits.HERTZ)
return Channel(f_lo, f_hi, f_bw)
[docs]class Channel(object):
"""
Representation of a channel within a spectral window.
This object can be considered as a FrequencyRange object plus an effective
bandwidth property. It provides the same interface as a FrequencyRange.
"""
__slots__ = ('frequency_range', 'effective_bw')
def __getstate__(self):
return self.frequency_range, self.effective_bw
def __setstate__(self, state):
self.frequency_range, self.effective_bw = state
def __init__(self, start, end, effective_bw):
self.frequency_range = measures.FrequencyRange(start, end)
self.effective_bw = effective_bw
def __eq__(self, other):
if not isinstance(other, Channel):
return False
if other.frequency_range != self.frequency_range:
return False
if other.effective_bw != self.effective_bw:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return 'Channel(%s, %s, %s)' % (self.frequency_range.low,
self.frequency_range.high,
self.effective_bw)
@property
def low(self):
return self.frequency_range.low
@property
def high(self):
return self.frequency_range.high
[docs] def contains(self, frequency):
return self.frequency_range.contains(frequency)
[docs] def convert_to(self, newUnits=measures.FrequencyUnits.GIGAHERTZ):
return self.frequency_range.convert_to(newUnits)
[docs] def getCentreFrequency(self):
return self.frequency_range.getCentreFrequency()
[docs] def getOverlapWith(self, other):
return self.frequency_range.getOverlapWith(other)
[docs] def getGapBetween(self, other):
return self.frequency_range.getGapBetween(other)
[docs] def getWidth(self):
return self.frequency_range.getWidth()
[docs] def overlaps(self, other):
return self.frequency_range.overlaps(other)
[docs] def set(self, frequency1, frequency2):
self.frequency_range.set(frequency1, frequency2)
[docs]class SpectralWindow(object):
"""
SpectralWindow is a logical representation of a spectral window.
.. py:attribute:: id
the numerical identifier of this spectral window within the
SPECTRAL_WINDOW subtable of the measurement set
.. py:attribute:: channels
the number of channels
.. py:attribute:: bandwidth
the total bandwidth
.. py:attribute:: ref_frequency
the reference frequency
.. py:attribute:: chan_width
the channel width
.. py:attribute:: intents
the observing intents that have been observed using this spectral
window
"""
__slots__ = ('id', 'band', 'bandwidth', 'type', 'intents', 'ref_frequency', 'name', 'baseband', 'sideband',
'receiver', 'freq_lo', 'mean_frequency', '_min_frequency', '_max_frequency', '_centre_frequency',
'channels', '_ref_frequency_frame', 'transitions')
def __getstate__(self):
return (self.id, self.band, self.bandwidth, self.type, self.intents, self.ref_frequency, self.name,
self.baseband, self.sideband, self.receiver, self.freq_lo, self.mean_frequency, self._min_frequency,
self._max_frequency, self._centre_frequency, self.channels, self._ref_frequency_frame, self.transitions)
def __setstate__(self, state):
(self.id, self.band, self.bandwidth, self.type, self.intents, self.ref_frequency, self.name, self.baseband,
self.sideband, self.receiver, self.freq_lo, self.mean_frequency, self._min_frequency, self._max_frequency,
self._centre_frequency, self.channels, self._ref_frequency_frame, self.transitions) = state
def __repr__(self):
chan_freqs = self.channels.chan_freqs
if isinstance(chan_freqs, ArithmeticProgression):
chan_freqs = numpy.array(list(chan_freqs))
chan_widths = self.channels.chan_widths
if isinstance(chan_widths, ArithmeticProgression):
chan_widths = numpy.array(list(chan_widths))
chan_effective_bws = self.channels.chan_effbws
if isinstance(chan_effective_bws, ArithmeticProgression):
chan_effective_bws = numpy.array(list(chan_effective_bws))
return ('SpectralWindow({0!r}, {1!r}, {2!r}, {3!r}, {4!r}, {5!r}, {6}, {7}, {8}, {9!r}, {10!r}, {11!r}, '
'{12!r})').format(
self.id,
self.name,
self.type,
float(self.bandwidth.to_units(measures.FrequencyUnits.HERTZ)),
dict(m0={'unit': 'Hz',
'value': float(self.ref_frequency.to_units(measures.FrequencyUnits.HERTZ))},
refer=self._ref_frequency_frame,
type='frequency'),
float(self.mean_frequency.to_units(measures.FrequencyUnits.HERTZ)),
'numpy.array(%r)' % chan_freqs.tolist(),
'numpy.array(%r)' % chan_widths.tolist(),
'numpy.array(%r)' % chan_effective_bws.tolist(),
self.sideband,
self.baseband,
self.band,
self.transitions
)
def __init__(self, spw_id, name, spw_type, bandwidth, ref_freq, mean_freq, chan_freqs, chan_widths,
chan_effective_bws, sideband, baseband, receiver, freq_lo, band='Unknown', transitions=None):
if transitions is None:
transitions = ['Unknown']
self.id = spw_id
self.bandwidth = measures.Frequency(bandwidth, measures.FrequencyUnits.HERTZ)
ref_freq_hz = casa_tools.quanta.convertfreq(ref_freq['m0'], 'Hz')
ref_freq_val = casa_tools.quanta.getvalue(ref_freq_hz)[0]
self.ref_frequency = measures.Frequency(ref_freq_val, measures.FrequencyUnits.HERTZ)
self._ref_frequency_frame = ref_freq['refer']
self.mean_frequency = measures.Frequency(mean_freq, measures.FrequencyUnits.HERTZ)
self.band = band
self.type = spw_type
self.intents = set()
# work around NumPy bug with empty strings
# http://projects.scipy.org/numpy/ticket/1239
self.name = str(name)
self.sideband = str(sideband)
self.baseband = str(baseband)
self.receiver = receiver
if freq_lo is not None:
self.freq_lo = [measures.Frequency(freq, measures.FrequencyUnits.HERTZ) for freq in freq_lo]
else:
self.freq_lo = freq_lo
chan_freqs = compress(chan_freqs)
chan_widths = compress(chan_widths)
chan_effective_bws = compress(chan_effective_bws)
self.channels = ChannelList(chan_freqs, chan_widths, chan_effective_bws)
self._min_frequency = min(self.channels, key=lambda r: r.low).low
self._max_frequency = max(self.channels, key=lambda r: r.high).high
self._centre_frequency = (self._min_frequency + self._max_frequency) / 2.0
self.transitions = transitions
@property
def centre_frequency(self):
return self._centre_frequency
[docs] def channel_range(self, minfreq, maxfreq):
"""
# More work on this in future
minfreq -- measures.Frequency object in HERTZ
maxfreq -- measures.Frequency object in HERTZ
"""
freqmin = minfreq
freqmax = maxfreq
# Check for the no overlap case.
nchan = self.num_channels
if freqmax < self.min_frequency:
return None, None
if freqmin > self.max_frequency:
return None, None
# Find the minimum channel
chan_min = 0
if self.channels[0].low < self.channels[nchan-1].low:
for i in range(nchan):
if self.channels[i].low > freqmin:
break
chan_min = i
else:
for i in range(nchan):
if self.channels[i].high < freqmax:
break
chan_min = i
# Find the maximum channel
chan_max = nchan - 1
if self.channels[0].low < self.channels[nchan-1].low:
for i in range(nchan-1, -1, -1):
if self.channels[i].high < freqmax:
break
chan_max = i
else:
for i in range(nchan-1, -1, -1):
if self.channels[i].low > freqmin:
break
chan_max = i
return chan_min, chan_max
@property
def frame(self):
return self._ref_frequency_frame
@property
def min_frequency(self):
return self._min_frequency
@property
def max_frequency(self):
return self._max_frequency
@property
def num_channels(self):
return len(self.channels)
def __str__(self):
args = [str(x) for x in [self.id, self.centre_frequency, self.bandwidth, self.type]]
return 'SpectralWindow({0})'.format(', '.join(args))
[docs]class SpectralWindowWithChannelSelection(object):
"""
SpectralWindowWithChannelSelection decorates a SpectralWindow so that the
spectral window ID also contains a channel selection.
"""
def __init__(self, subject, channels):
self._subject = subject
channels = sorted(list(channels))
# prepare a string representation of the number of channels. If all
# channels are specified for this spw, just specify the spw in the
# string representation
if set(channels).issuperset(set(range(0, subject.num_channels))):
ranges = []
else:
ranges = []
for _, g in itertools.groupby(enumerate(channels), lambda i_x: i_x[0] - i_x[1]):
rng = list(map(operator.itemgetter(1), g))
if len(rng) == 1:
ranges.append('%s' % rng[0])
else:
ranges.append('%s~%s' % (rng[0], rng[-1]))
self._channels = ';'.join(ranges)
def __getattr__(self, name):
return getattr(self._subject, name)
@property
def id(self):
channels = ':%s' % self._channels if self._channels else ''
return '{spw}{channels}'.format(spw=self._subject.id,
channels=channels)