import collections
import datetime
import json
import os
import re
import string
import urllib
import numpy
import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
from pipeline.infrastructure import casa_tools
LOG = infrastructure.get_logger(__name__)
QueryStruct = collections.namedtuple('QueryStruct', ['param', 'subparam'])
ResponseStruct = collections.namedtuple('ResponseStruct', ['response', 'subparam'])
[docs]class ALMAJyPerKDatabaseAccessBase(object):
BASE_URL = 'https://asa.alma.cl/science/jy-kelvins'
ENDPOINT_TYPE = None
@property
def url(self):
assert self.ENDPOINT_TYPE is not None, \
'{} cannot be instantiated. Please use subclasses.'.format(self.__class__.__name__)
s = '/'.join([self.BASE_URL, self.ENDPOINT_TYPE])
if not s.endswith('/'):
s += '/'
return s
def __init__(self, context=None):
"""
ALMAJyPerKDatabaseAccessBase is a base class for accessing Jy/K
DB to retrieve conversion factor for ALMA TP data.
ALMAJyPerKDatabaseAccessBase is kind of a template class that
only provides a standard workflow to get a list of conversion
factors. Each access class must inherit this class and
implement/override some methods according to the target API.
Subclasses must implement properties and methods listed below:
ENDPOINT_TYPE (property): Must be a string representing the API
access (method): Receive a list of queries as a generator,
access the DB through the generator, and
return the formatted response. Return value
should be a dictionary with 'query', 'data',
and 'total' fields. The 'query' field holds
the query whil the 'data' field stores the
response. The 'total' fields is the number of
response. Each item of the 'data' field should
consist of single conversion factor ('Factor')
with the meta-data, 'MS', 'Antenna', 'Spwid',
'Polarization'.
get_params (method): Receive a name of the MS and generate
a dictionary containing a list of query
parameters. Required parameters depend on
the API.
Keyword Arguments:
context {Context} -- Pipeline Context object (default: {None})
"""
self.context = context
def _get_observing_band(self, ms):
if self.context is None:
return 'Unknown'
spws = ms.get_spectral_windows(science_windows_only=True)
bands = [spw.band for spw in spws]
return numpy.unique(bands)
def _generate_query(self, url, params):
try:
for p in params:
# encode params
encoded = urllib.parse.urlencode(p.param)
# try opening url
query = '?'.join([url, encoded])
LOG.info('Accessing Jy/K DB: query is "{}"'.format(query))
# set timeout to 3min (=180sec)
response = urllib.request.urlopen(query, timeout=400)#180)
retval = json.load(response)
if not retval['success']:
msg = 'Failed to get a Jy/K factor from DB: {}'.format(retval['error'])
LOG.warn(msg)
raise RuntimeError(msg)
yield ResponseStruct(response=retval, subparam=p.subparam)
except urllib.error.HTTPError as e:
msg = 'Failed to load URL: {0}\n'.format(url) \
+ 'Error Message: HTTPError(code={0}, Reason="{1}")\n'.format(e.code, e.reason)
LOG.warn(msg)
raise e
except urllib.error.URLError as e:
msg = 'Failed to load URL: {0}\n'.format(url) \
+ 'Error Message: URLError(Reason="{0}")\n'.format(e.reason)
LOG.warn(msg)
raise e
[docs] def validate(self, vis):
basename = os.path.basename(vis.rstrip('/'))
try:
ms = self.context.observing_run.get_ms(vis)
except KeyError:
LOG.error('{} is not registered to context'.format(basename))
raise
array_name = ms.antenna_array.name
if array_name != 'ALMA':
raise RuntimeError('{} is not ALMA data'.format(basename))
[docs] def getJyPerK(self, vis):
"""
getJyPerK returns list of Jy/K conversion factors with their
meta data (MS name, antenna name, spwid, and pol string).
Arguments:
vis {str} -- Name of MS
Returns:
[list] -- List of Jy/K conversion factors with meta data
"""
# sanity check
self.validate(vis)
# get Jy/K value from DB
jyperk = self.get(vis)
# convert to pipeline-friendly format
formatted = self.format_jyperk(vis, jyperk)
#LOG.info('formatted = {}'.format(formatted))
filtered = self.filter_jyperk(vis, formatted)
#LOG.info('filtered = {}'.format(filtered))
return filtered
[docs] def get_params(self, vis):
raise NotImplementedError
[docs] def access(self, queries):
raise NotImplementedError
[docs] def get(self, vis):
"""
Access Jy/K DB and return its response.
Arguments:
vis {str} -- Name of MS
Raises:
urllib2.HTTPError
urllib2.URLError
Returns:
[dict] -- Response from the DB as a dictionary. It should contain
the following keys:
'query' -- query data
'total' -- number of data
'data' -- data
"""
# set URL
url = self.url
params = self.get_params(vis)
queries = self._generate_query(url, params)
retval = self.access(queries)
# retval should be a dict that consists of
# 'query': query data
# 'total': number of data
# 'data': response data
return retval
[docs] def filter_jyperk(self, vis, factors):
ms = self.context.observing_run.get_ms(vis)
science_windows = [x.id for x in ms.get_spectral_windows(science_windows_only=True)]
filtered = [i for i in factors if (len(i) == 5) and (i[0] == ms.basename) and (int(i[2]) in science_windows)]
return filtered
[docs]class JyPerKAbstractEndPoint(ALMAJyPerKDatabaseAccessBase):
[docs] def get_params(self, vis):
ms = self.context.observing_run.get_ms(vis)
# parameter dictionary
params = {}
# date
params['date'] = mjd_to_datestring(ms.start_time)
# temperature
params['temperature'] = get_mean_temperature(vis)
# other
params.update(self._aux_params())
# loop over antennas and spws
for ant in ms.antennas:
# antenna name
params['antenna'] = ant.name
# elevation
params['elevation'] = get_mean_elevation(self.context, vis, ant.id)
for spw in ms.get_spectral_windows(science_windows_only=True):
# observing band is taken from the string spw.band
# whose format should be "ALMA Band X"
params['band'] = int(spw.band.split()[-1])
# baseband
params['baseband'] = int(spw.baseband)
# mean frequency
params['frequency'] = get_mean_frequency(spw)
# subparam is dictionary holding vis and spw id
subparam = {'vis': vis, 'spwid': spw.id}
yield QueryStruct(param=params, subparam=subparam)
[docs] def access(self, queries):
data = []
for result in queries:
# response from DB
response = result.response
# subparam is dictionary holding vis and spw id
subparam = result.subparam
assert isinstance(subparam, dict)
assert ('vis' in subparam) and ('spwid' in subparam)
spwid = subparam['spwid']
assert isinstance(spwid, int)
vis = subparam['vis']
assert isinstance(vis, str)
basename = os.path.basename(vis.rstrip('/'))
factor = self._extract_factor(response)
polarization = 'I'
antenna = response['query']['antenna']
data.append({'MS': basename, 'Antenna': antenna, 'Spwid': spwid,
'Polarization': polarization, 'factor': factor})
return {'query': '', 'data': data, 'total': len(data)}
def _aux_params(self):
return {}
def _extract_factor(self, response):
raise NotImplementedError
[docs]class JyPerKAsdmEndPoint(ALMAJyPerKDatabaseAccessBase):
ENDPOINT_TYPE = 'asdm'
[docs] def get_params(self, vis):
# subparam is vis
yield QueryStruct(param={'uid': vis_to_uid(vis)}, subparam=vis)
[docs] def access(self, queries):
responses = list(queries)
# there should be only one query
assert len(responses) == 1
response = responses[0].response
response['total'] = response['data']['length']
response['data'] = response['data']['factors']
return response
[docs]class JyPerKModelFitEndPoint(JyPerKAbstractEndPoint):
ENDPOINT_TYPE = 'model-fit'
def _extract_factor(self, response):
return response['data']['factor']
[docs]class JyPerKInterpolationEndPoint(JyPerKAbstractEndPoint):
ENDPOINT_TYPE = 'interpolation'
def _aux_params(self):
return {'delta_days': 1000}
def _extract_factor(self, response):
return response['data']['factor']['mean']
[docs]def vis_to_uid(vis):
"""
Convert MS name like uid___A002_Xabcd_X012 into uid://A002/Xabcd/X012
Arguments:
vis {str} -- Name of MS
Raises:
RuntimeError:
Returns:
str -- Corresponding ASDM uid
"""
basename = os.path.basename(vis.rstrip('/'))
pattern = '^uid___A[0-9][0-9][0-9]_X[0-9a-f]+_X[0-9a-f]+\.ms$'
if re.match(pattern, basename):
return basename.replace('___', '://').replace('_', '/').replace('.ms', '')
else:
raise RuntimeError('MS name is not appropriate for DB query: {}'.format(basename))
[docs]def mjd_to_datestring(epoch):
# casa_tools
me = casa_tools.measures
qa = casa_tools.quanta
if epoch['refer'] != 'UTC':
try:
epoch = me.measure(epoch, 'UTC')
finally:
me.done()
t = qa.splitdate(epoch['m0'])
dd = datetime.datetime(t['year'], t['month'], t['monthday'], t['hour'], t['min'], t['sec'], t['usec'])
#datestring = dd.strftime('%Y-%m-%dT%H:%M:%S.%f')
datestring = dd.strftime('%Y-%m-%dT%H:%M:%S')
return datestring
[docs]def get_mean_frequency(spw):
return float(spw.mean_frequency.convert_to(measures.FrequencyUnits.HERTZ).value)
[docs]def get_mean_temperature(vis):
with casa_tools.TableReader(os.path.join(vis, 'WEATHER')) as tb:
valid_temperatures = numpy.ma.masked_array(
tb.getcol('TEMPERATURE'),
tb.getcol('TEMPERATURE_FLAG')
)
return valid_temperatures.mean()
[docs]def get_mean_elevation(context, vis, antenna_id):
dt_name = context.observing_run.ms_datatable_name
basename = os.path.basename(vis.rstrip('/'))
with casa_tools.TableReader(os.path.join(dt_name, basename, 'RO')) as tb:
try:
t = tb.query('ANTENNA=={}&&SRCTYPE==0'.format(antenna_id))
assert t.nrows() > 0
elevations = t.getcol('EL')
finally:
t.close()
return elevations.mean()
[docs]def translate_spw(data, ms):
vis = ms.name
science_windows = numpy.asarray(ms.get_spectral_windows(science_windows_only=True))
with casa_tools.TableReader(os.path.join(vis, 'ASDM_SPECTRALWINDOW')) as tb:
idcol = tb.getcol('spectralWindowId')
namecol = tb.getcol('name')
translated = []
science_window_names = numpy.asarray([x.name for x in science_windows])
LOG.info('Translate ASDM Spws to MS Spws:')
for d in data:
asdm_spw_id = d['Spwid']
asdm_spw_names = namecol[numpy.where(idcol == 'SpectralWindow_{}'.format(asdm_spw_id))]
assert len(asdm_spw_names) == 1
asdm_spw_name = asdm_spw_names[0]
if asdm_spw_name.endswith('CH_AVG'):
chan_avg_name = asdm_spw_name
full_res_name = asdm_spw_name.replace('CH_AVG', 'FULL_RES')
elif asdm_spw_name.endswith('FULL_RES'):
chan_avg_name = asdm_spw_name.replace('FULL_RES', 'CH_AVG')
full_res_name = asdm_spw_name
else:
chan_avg_name = asdm_spw_name
full_res_name = asdm_spw_name
i = numpy.where(science_window_names == full_res_name)
if len(i[0]) == 0:
i = numpy.where(science_window_names == chan_avg_name)
if len(i[0]) > 0:
spws = science_windows[i]
assert len(spws) == 1
spw = spws[0]
t = d.copy()
t['Spwid'] = '{}'.format(spw.id)
translated.append(t)
LOG.info(' * ASDM Spw {} (name {})'.format(asdm_spw_id, asdm_spw_name))
LOG.info(' -> MS Spw {} (name {})'.format(spw.id, spw.name))
return translated