import contextlib
import csv
import io
import numpy
import pipeline.infrastructure as infrastructure
LOG = infrastructure.get_logger(__name__)
[docs]def read(context, filename):
"""
Reads jyperk factors from a file and returns a string list
of [['MS','ant','spwid','polid','factor'], ...]
"""
filetype = inspect_type(filename)
if filetype == 'MS-Based':
LOG.debug('MS-Based Jy/K factors file is specified')
return read_ms_based(filename)
else:
LOG.debug('Session-Based Jy/K factors file is specified')
return read_session_based(context, filename)
[docs]def inspect_type(filename):
with open(filename, 'r') as f:
line = f.readline()
if len(line) > 0 and line[0] == '#':
return 'Session-Based'
else:
return 'MS-Based'
[docs]def read_ms_based(reffile):
#factor_list = []
with open(reffile, 'r') as f:
return list(_read_stream(f))
[docs]def read_session_based(context, reffile):
parser = JyPerKDataParser
storage = JyPerK()
with open(reffile, 'r') as f:
for line in f:
if line[0] == '#':
# header
meta = parser.parse_header(line)
storage.register_meta(meta)
else:
# data
data = parser.parse_data(line)
storage.register_data(data)
with associate(context, storage) as f:
return list(_read_stream(f))
def _read_stream(stream):
reader = csv.reader(stream)
# Check if first line is header or not
line = next(reader)
if len(line) == 0 or line[0].strip().upper() == 'MS' or line[0].strip()[0] == '#':
# must be a header, commented line, or empty line
pass
elif len(line) == 5:
# may be a data record
#factor_list.append(line)
yield line
else:
LOG.error('Jy/K factor file is invalid format')
for line in reader:
if len(line) == 0 or len(line[0]) == 0 or line[0][0] == '#':
continue
elif len(line) == 5:
yield line
else:
LOG.error('Jy/K factor file is invalid format')
# Utility classes/functions to convert session based factors file
# to MS based one are defined below
[docs]class JyPerKDataParser(object):
[docs] @classmethod
def get_content(cls, line):
return line.strip('# \n\t')
[docs] @classmethod
def parse_data(cls, line):
content = cls.get_content(line)
if content.find(',') != -1:
# data
return content.split(',')
else:
# invalid or empty line, ignored
return None
[docs]class JyPerK(object):
"""
Parse session based jyperk csv and store.
* meta stores meta data information from the lines in the form, '#name=value',
as a dictionary, meta[name]=value.
* header stores column label from the line in the form '#header0, header1, ...'
as a list, header = ['header0', 'header1', ...]
* data stores values in csv file as a dictionary,
data['header0'] = [data00, data01, ...]
"""
def __init__(self):
self.meta = dict()
self.header = []
self.data = []
[docs] def register_data(self, content):
assert len(self.header) > 0
assert len(self.header) == len(content)
for (k, v) in zip(self.header, content):
self.data[k].append(v)
[docs]@contextlib.contextmanager
def associate(context, factors):
"""
Convert data collected from session based jyperk csv as JyPerK object
to MS-beased csv, i.e., a string list of ['MS,ant,spwid,polid,factor', ...]
"""
stream = io.StringIO()
try:
data = factors.data
for ms in context.observing_run.measurement_sets:
session_name = ms.session
if session_name == 'Session_default':
# Session_default is not supported, use Session_1 instead
LOG.warn('Session for %s is \'Session_default\'. Use \'Session_1\' for application of Jy/K factor. ' %
ms.basename)
session_id = 1
else:
# session_name should be 'Session_X' where X is an integer
session_id = int(session_name.split('_')[-1])
session_list = numpy.array([int(x) for x in data['sessionID']])
idx = numpy.where(session_list == session_id)
antennas = [x.name for x in ms.antennas]
antenna_list = data['Antenna']
factor_list = numpy.array([float(x) for x in data['Factor']])
spws = ms.get_spectral_windows()
bandcenter = numpy.array([float(x) * 1.0e6 for x in data['BandCenter(MHz)']])
bandwidth = numpy.array([float(x) * 1.0e6 for x in data['BandWidth(MHz)']])
range_min = bandcenter - 0.5 * bandwidth
range_max = bandcenter + 0.5 * bandwidth
for spw in spws:
max_freq = float(spw.max_frequency.value)
min_freq = float(spw.min_frequency.value)
tot_bandwidth = float(spw.bandwidth.value)
spwid = spw.id
d = {}
for i in idx[0]:
#coverage = inspect_coverage(min_freq, max_freq, range_min[i], range_max[i])
antenna = antenna_list[i]
if antenna in d:
#d[antenna].append([i, coverage, bandwidth[i]])
d[antenna].append(i)
else:
#d[antenna] = [[i, coverage, bandwidth[i]]]
d[antenna] = [i]
for ant in antennas:
if ant in d:
f = d[ant]
else:
LOG.info('%s: No factors available for spw %s antenna %s use ANONYMOUS' %
(session_name, spwid, ant))
f = d['ANONYMOUS']
coverage_list = [inspect_coverage(min_freq, max_freq, range_min[x], range_max[x]) for x in f]
#_best_index = numpy.argmax(coverage_list)
best_index = f[0]
_best_score = inspect_coverage(min_freq, max_freq, range_min[f[0]], range_max[f[0]])
for _i in f[1:]:
coverage = inspect_coverage(min_freq, max_freq, range_min[_i], range_max[_i])
if coverage > _best_score:
best_index = _i
_best_score = coverage
line = '%s,%s,%s,%s,%s' % (ms.basename, ant, spwid, data['POL'][best_index],
factor_list[best_index])
LOG.debug(line)
stream.write(line + '\n')
stream.seek(0, 0)
yield stream
finally:
stream.close()
[docs]def inspect_coverage(minval, maxval, minref, maxref):
if minval > maxval or minref > maxref:
return 0.0
coverage = (min(maxval, maxref) - max(minval, minref)) / (maxval - minval)
bandwidth_ratio = (maxref - minref) / (maxval - minval)
if coverage < 0.0 or coverage > 1.0 or bandwidth_ratio > 1.1:
return 0.0
return coverage