Source code for pipeline.hsd.tasks.importdata.reader
import collections
import glob
import itertools
import os
import shutil
import string
import numpy
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.logging as logging
from pipeline.domain.datatable import DataTableImpl as DataTable
from pipeline.domain.datatable import DataTableColumnMaskList as ColMaskList
from pipeline.domain.datatable import OnlineFlagIndex
from pipeline.hsd.tasks.common import mjd_to_datestring, TableSelector
from pipeline.infrastructure import casa_tools
from ..common import rasterutil
from ..common import direction_utils as dirutil
LOG = infrastructure.get_logger(__name__)
[docs]def get_value_in_deg(quantity):
qa = casa_tools.quanta
return qa.getvalue(qa.convert(quantity, 'deg'))
[docs]def get_state_id(ms, spw, intent):
states = (s for s in ms.states if intent in s.intents)
obs_modes = set()
for s in states:
modes = set(s.get_obs_mode_for_intent(intent))
obs_modes.update(modes)
state_ids = set()
with casa_tools.MSReader(ms.name) as msreader:
for obs_mode in obs_modes:
msreader.msselect({'spw': spw, 'scanintent': obs_mode}, onlyparse=True)
indices = msreader.msselectedindices()
state_ids.update(indices['stateid'])
# Need to reset explicitly after CASA 5.3. See CAS-11088 for detail.
msreader.selectinit(reset=True)
return numpy.fromiter(state_ids, dtype=numpy.int32)
[docs]def merge_timerange(timerange_list):
timegap_list = numpy.asarray([l1[0] - l0[1] for l0, l1 in zip(timerange_list, timerange_list[1:])])
LOG.info(f'timegap_list is {timegap_list}')
# regard timegap <= 0.1msec as continuous
gap_index = [-1] + numpy.where(timegap_list > 1e-4)[0].tolist() + [len(timegap_list)]
timerange_merged = [[timerange_list[i + 1][0], timerange_list[j][1]] for i, j in zip(gap_index, gap_index[1:])]
LOG.info(f'timerange_merged is {timerange_merged}')
return timerange_merged
[docs]def initialize_template(flagtemplate):
# remove existing template file
if os.path.exists(flagtemplate):
os.remove(flagtemplate)
# initialize template file with header
with open(flagtemplate, 'w') as f:
f.write('# This template file is auto-generated by hsd_importdata to flag MS rows based on pointing data \n')
[docs]def write_flagcmd(flagtemplate, antenna_id, timerange_list, reason=''):
qa = casa_tools.quanta
sanitized = reason.replace(' ', '_')
template = string.Template(f"mode='manual' antenna='{antenna_id}&&&' timerange='$timerange' reason='SDPL:{sanitized}'\n")
with open(flagtemplate, 'a') as f:
for timerange in timerange_list:
# timerange is [start, end] list of MJD [sec]
rangestr = '~'.join(map(lambda t: '{year}/{month}/{monthday}/{hour}:{min}:{s:.7f}'.format(**qa.splitdate(qa.quantity(t, 's'))), timerange))
f.write(template.safe_substitute(timerange=rangestr))
[docs]def set_nominal_direction(ant, srctype, az, el, ra, dec,
shift_ra, shift_dec, offset_ra, offset_dec):
# check if there are NaN's
isvalid = numpy.logical_not(numpy.isnan(az))
if numpy.all(isvalid):
# no NaN's so exit this loop
print('no NaN')
return
for _a, _s in itertools.product(set(ant), set(srctype)):
# select data
print(f'ant {_a} src {_s}')
sel = numpy.logical_and(ant == _a, srctype == _s)
if numpy.all(numpy.logical_not(sel)):
# no data for this selection
print('no data')
continue
# separate NaN's
mask = numpy.logical_and(isvalid, sel)
if numpy.all(mask[sel]):
# no NaN's for this selection
print('no NaN for this selection')
continue
nanmask = numpy.logical_and(numpy.logical_not(isvalid), sel)
_az = numpy.median(az[mask])
print('nominal az {}'.format(_az))
_el = numpy.median(el[mask])
print('nominal el {}'.format(_el))
_ra = numpy.median(ra[mask])
print('nominal ra {}'.format(_ra))
_dec = numpy.median(dec[mask])
print('nominal dec {}'.format(_dec))
_shift_ra = numpy.median(shift_ra[mask])
print('nominal shift_ra {}'.format(_shift_ra))
_shift_dec = numpy.median(shift_dec[mask])
print('nominal shift_dec {}'.format(_shift_dec))
_offset_ra = numpy.median(offset_ra[mask])
print('nominal offset_ra {}'.format(_offset_ra))
_offset_dec = numpy.median(offset_dec[mask])
print('nominal offset_dec {}'.format(_offset_dec))
az[nanmask] = _az
el[nanmask] = _el
ra[nanmask] = _ra
dec[nanmask] = _dec
shift_ra[nanmask] = _shift_ra
shift_dec[nanmask] = _shift_dec
offset_ra[nanmask] = _offset_ra
offset_dec[nanmask] = _offset_dec
[docs]class MetaDataReader(object):
def __init__(self, context, ms, table_name):
"""
context -- pipeline context
mses -- list of measurementset domain objects
table_name -- name of DataTable
"""
self.context = context
self.ms = ms
self.table_name = table_name
# existing table should be the one generated by the previous run
# so it must be overwritten
if os.path.exists(self.table_name):
shutil.rmtree(self.table_name)
self.datatable = DataTable(name=self.table_name, readonly=False)
self.vAnt = 0
self.appended_row = 0
# invalid_pointing_data is dictionary (collections.defaultdict)
# it stores an information of the data that caused
# msmd.pointingdirection to fail due to the lack of
# pointing data in POINTING table
# key: antenna id
# value: list of DataTable rows for invalid data
self.invalid_pointing_data = collections.defaultdict(list)
# remove flag template for pointing
ms_prefix = os.path.splitext(self.ms.basename)[0]
self.flagtemplate = os.path.join(self.context.output_dir, f'{ms_prefix}.flagpointing.txt')
initialize_template(self.flagtemplate)
@property
def name(self):
return self.ms.name
[docs] def detect_target_spw(self):
if not hasattr(self, 'name'):
return []
ms = self.ms
spws = ms.get_spectral_windows(science_windows_only=True)
return [x.id for x in spws]
[docs] def detect_target_data_desc(self):
science_windows = self.detect_target_spw()
ms = self.ms
def _g():
for spwid in science_windows:
dd = ms.get_data_description(spw=spwid)
assert dd is not None
yield dd.id
dds = numpy.fromiter(_g(), dtype=numpy.int32)
return dds
[docs] def register_invalid_pointing_data(self, antenna_id, row):
self.invalid_pointing_data[antenna_id].append(row)
[docs] def generate_flagcmd(self):
# PIPE-646
# per-antenna row list for the data without pointing data
# key: antenna id
# value: list of rows to be flagged
flagdict1 = self.generate_flagdict_for_invalid_pointing_data()
# PIPE-647
# per-antenna row list for uniform image rms
# key: antenna id
# value: list of rows to be flagged
flagdict2 = self.generate_flagdict_for_uniform_rms()
# do nothing if no rows to be flagged
nflag1 = sum(map(len, flagdict1.values()))
nflag2 = sum(map(len, flagdict2.values()))
if nflag1 + nflag2 == 0:
return
# for debugging
if LOG.isEnabledFor(logging.DEBUG):
merged = collections.defaultdict(list)
for k, v in itertools.chain(flagdict1.items(), flagdict2.items()):
merged[k].extend(v)
for aid, dt_rows in merged.items():
with open(f'toflag_ant{aid}.txt', 'w') as f:
f.write('\n'.join(map(lambda x: str(self.datatable.getcell('ROW', x)), dt_rows)))
f.write('\n')
# generate flag command
self._generate_flagcmd(self.flagtemplate, flagdict1, reason='missing pointing data')
self._generate_flagcmd(self.flagtemplate, flagdict2, reason='uniform image rms')
[docs] def generate_flagdict_for_invalid_pointing_data(self):
if len(self.invalid_pointing_data) > 0:
LOG.warn('{}: There are rows without corresponding POINTING data'.format(self.ms.basename))
LOG.warn('Affected antennas are: {}'.format(' '.join(
[self.ms.antennas[k].name for k in self.invalid_pointing_data])))
return self.invalid_pointing_data
[docs] def generate_flagdict_for_uniform_rms(self):
# extract necessary metadata from datatable
metadata = rasterutil.read_datatable(self.datatable)
# apply flagging heuristics
flag_list = rasterutil.flag_raster_map(metadata)
# generate dictionary
flag_dict = dict(enumerate(flag_list))
return flag_dict
def _generate_flagcmd(self, flagtemplate, flag_dict, reason=''):
datatable = self.datatable
for antenna_id, rowlist in flag_dict.items():
time_list, sort_index = numpy.unique(
[datatable.getcell('TIME', row) for row in rowlist],
return_index=True)
# day -> sec
time_list *= 86400.0
interval_list = [datatable.getcell('EXPOSURE', row) for row in numpy.asarray(rowlist)[sort_index]]
LOG.info(f'antenna {antenna_id}: time_list is {time_list}')
if len(time_list) == 0:
continue
timerange_list = [[t - i / 2, t + i / 2] for t, i in zip(time_list, interval_list)]
LOG.info(f'antenna {antenna_id}: timerange_list is {timerange_list}')
timerange_merged = merge_timerange(timerange_list)
write_flagcmd(flagtemplate, antenna_id, timerange_merged, reason)
[docs] def execute(self, dry_run=True):
if dry_run:
return
# name of the MS
name = self.name
spwids = self.detect_target_spw()
nchan_map = dict([(spwid, self.ms.get_spectral_window(spwid).num_channels) for spwid in spwids])
ddids = self.detect_target_data_desc()
#Rad2Deg = 180. / 3.141592653
# FILENAME keyword stores name of the MS
LOG.info('name=%s' % name)
self.datatable.putkeyword('FILENAME', name)
# 2018/04/18 TN
# CAS-10874 single dish pipeline should use ICRS instead of J2000
# For consistency throuout the single dish pipeline, direction
# reference frame is stored in the DataTable
outref = self._get_outref()
# register direction reference to datatable
self.datatable.direction_ref = outref
azelref = self._get_azelref()
#LOG.info('outref="{0}" azelref="{1}"'.format(outref, azelref))
ms = self.ms
assert ms is not None
spwsel = ','.join(map(str, spwids))
target = 'TARGET'
reference = 'REFERENCE'
assert target in ms.intents
#assert reference in ms.intents
target_states = get_state_id(ms, spwsel, target)
reference_states = get_state_id(ms, spwsel, reference)
# target_states = (s for s in ms.states if target in s.intents)
# reference_states = (s for s in ms.states if reference in s.intents)
# target_obs_modes = set()
# for s in target_states:
# modes = set(s.get_obs_mode_for_intent(target))
# target_obs_modes.update(modes)
# modes = set(s.get_obs_mode_for_intent(reference))
# obs_modes.update(modes)
# state_ids = set()
# with casa_tools.MSReader(name) as msreader:
# for obs_mode in obs_modes:
# msreader.msselect({'spw': spwsel, 'scanintent': obs_mode}, onlyparse=True)
# indices = msreader.msselectedindices()
# state_ids.update(indices['stateid'])
# state_ids = numpy.fromiter(state_ids, dtype=numpy.int32)
state_ids = numpy.concatenate([target_states, reference_states])
target_state_ids = numpy.concatenate([target_states])
# get antenna position list
mpositions = [ a.position for a in ms.antennas ]
# get names of ephemeris sources (excludes 'COMET')
me = casa_tools.measures
direction_codes = me.listcodes( me.direction() )
ephemeris_list = direction_codes['extra']
known_ephemeris_list = numpy.delete( ephemeris_list, numpy.where(ephemeris_list=='COMET') )
# set org_directions
ephemsrc_list = [] # list of ephemsrc names (unique appearance)
ephemsrc_names = {} # ephemsrc name for each field_id
ephem_tables = {} # epheris table name for each field_id if applicasble
# ephem_tables to be "" for known_ephemeris_list and non-ephemeris sources
with casa_tools.TableReader(os.path.join(name, 'FIELD')) as tb:
field_ids = list(range(tb.nrows()))
for field_id in list(set(field_ids)):
fields = ms.get_fields( field_id = field_id )
source_name = (fields[0].source.name) # removed upper() 2019.5.14
if 'EPHEMERIS_ID' in tb.colnames():
ephemeris_ids = tb.getcol( 'EPHEMERIS_ID' )
else:
ephemeris_ids = []
# check if known_ephemeris_sources
if source_name.upper() in known_ephemeris_list and not fields[0].source.is_eph_obj:
fields[0].source.is_known_eph_obj = True
else:
fields[0].source.is_known_eph_obj = False
# ephemeris source with ephemeris table
if fields[0].source.is_eph_obj:
ephemsrc_names.update( { field_id:source_name } )
if source_name.upper not in ephemsrc_list:
# found a new ephemeris source
ephemsrc_list.append( source_name )
# pick ephemeris table name
ephem_table_files = glob.glob( ms.name+'/FIELD/EPHEM'+str(ephemeris_ids[field_id])+'_*.tab' )
if len(ephem_table_files) > 1:
raise RuntimeError( "multiple ephemeris tables found for field_id={}".format(field_id) )
if len(ephem_table_files) == 1:
ephem_table_file = ephem_table_files[0]
# double check the source name in ephem_table_file
with casa_tools.TableReader(ephem_table_file) as tb2:
keywords = tb2.getkeywords()
if keywords['NAME'] != source_name:
raise RuntimeError( "source name in ephemeris table {0} was {1}, inconsistent with {2}".format( ephem_table_file, keywords['NAME'], source_name ) )
ephem_tables.update( {field_id:ephem_table_file} )
LOG.info( "FIELD_ID={} ({}) with ephemeris table {}".format( field_id, source_name, ephem_table_file ) )
# known ephemeris source without ephemeirs table (not applicable for ALMA)
elif fields[0].source.is_known_eph_obj:
ephemsrc_names.update( { field_id:source_name } )
ephem_tables.update( {field_id:'' } )
LOG.info( "FIELD_ID={} ({}) as KNOWN EPHEMERIS SOURCE".format( field_id, source_name ) )
# non-ephemeris source
else:
ephemsrc_names.update( { field_id:'' } )
ephem_tables.update( {field_id:'' } )
LOG.info( "FIELD_ID={} ({}) as NORMAL SOURCE".format( field_id, source_name) )
with TableSelector(name, 'ANTENNA1 == ANTENNA2 && FEED1 == FEED2 && DATA_DESC_ID IN %s && STATE_ID IN %s'%(list(ddids), list(target_state_ids))) as tb:
# find the first onsrc for each ephemeris source and pack org_directions
org_directions = {}
nrow = tb.nrows()
for irow in range(nrow):
field_id = tb.getcell( 'FIELD_ID', irow )
if field_id not in ephemsrc_names:
raise RuntimeError( "ephemsrc_name for field_id={0} does not exist".format(field_id) )
if ephemsrc_names[field_id] != "":
source_name = ephemsrc_names[field_id]
if source_name not in org_directions:
mjd_in_sec = tb.getcell( 'TIME', irow )
antenna_id = tb.getcell( 'ANTENNA1', irow )
time_meas = tb.getcolkeyword( 'TIME', 'MEASINFO' )
time_frame = time_meas['Ref']
me = casa_tools.measures
qa = casa_tools.quanta
mepoch = me.epoch(rf=time_frame, v0=qa.quantity(mjd_in_sec, 's'))
antennas = self.ms.get_antenna(antenna_id)
assert len(antennas) == 1
antenna_domain = antennas[0]
mposition = antenna_domain.position
fields = ms.get_fields( field_id = field_id )
is_known_eph_obj = fields[0].source.is_known_eph_obj
org_direction = self.get_reference_direction( source_name, ephem_tables[field_id], is_known_eph_obj, mepoch, mposition, outref )
org_directions.update( {source_name:org_direction} );
with casa_tools.TableReader(os.path.join(name, 'FIELD')) as tb:
field_ids = list(range(tb.nrows()))
for field_id in list(set(field_ids)):
fields = ms.get_fields( field_id = field_id )
source_name = fields[0].source.name
if source_name in org_directions:
fields[0].source.org_direction = org_directions[source_name]
LOG.info( "registering org_direction[{}] (field_id={} of {}) as {}".format( source_name, field_id, name, org_directions[source_name] ))
else:
org_direction = None
with TableSelector(name, 'ANTENNA1 == ANTENNA2 && FEED1 == FEED2 && DATA_DESC_ID IN %s && STATE_ID IN %s'%(list(ddids), list(state_ids))) as tb:
nrow = tb.nrows()
rows = tb.rownumbers()
Texpt = tb.getcol('INTERVAL')
Tmjd = tb.getcol('TIME')
time_meas = tb.getcolkeyword('TIME', 'MEASINFO')
time_frame = time_meas['Ref']
Tscan = tb.getcol('SCAN_NUMBER')
TDD = tb.getcol('DATA_DESC_ID')
ddspwmap = numpy.vectorize(lambda x: ms.get_data_description(id=x).spw.id, otypes=[numpy.int32])
ddnpolmap = numpy.vectorize(lambda x: ms.get_data_description(id=x).num_polarizations, otypes=[numpy.int32])
Tif = ddspwmap(TDD)
Tpol = ddnpolmap(TDD)
Tant = tb.getcol('ANTENNA1')
Tbeam = tb.getcol('FEED1')
Tsrctype = numpy.fromiter((0 if i in target_states else 1 for i in tb.getcol('STATE_ID')), dtype=numpy.int32)
Tflagrow = tb.getcol('FLAG_ROW')
Tflag = numpy.fromiter((numpy.all(tb.getcell('FLAG', i) == True) for i in range(nrow)), dtype=bool)
Tflagrow = numpy.logical_or(Tflagrow, Tflag)
field_ids = tb.getcol('FIELD_ID')
getsourcename = numpy.vectorize(lambda x: ms.get_fields(x)[0].source.name, otypes=['str'])
Tsrc = getsourcename(field_ids)
NchanArray = numpy.fromiter((nchan_map[n] for n in Tif), dtype=numpy.int)
ID = len(self.datatable)
LOG.info('ID=%s' % ID)
#ROWs = []
#IDs = []
self.datatable.addrows(nrow)
# column based storing
self.datatable.putcol('ROW', rows, startrow=ID)
self.datatable.putcol('SCAN', Tscan, startrow=ID)
self.datatable.putcol('IF', Tif, startrow=ID)
self.datatable.putcol('NPOL', Tpol, startrow=ID)
self.datatable.putcol('BEAM', Tbeam, startrow=ID)
self.datatable.putcol('TIME', Tmjd/86400.0, startrow=ID)
self.datatable.putcol('ELAPSED', Tmjd-Tmjd[0], startrow=ID)
self.datatable.putcol('EXPOSURE', Texpt, startrow=ID)
self.datatable.putcol('FIELD_ID', field_ids, startrow=ID)
Tra = numpy.zeros(nrow, dtype=numpy.float64)
Tdec = numpy.zeros(nrow, dtype=numpy.float64)
Tshift_ra = numpy.zeros(nrow, dtype=numpy.float64)
Tshift_dec = numpy.zeros(nrow, dtype=numpy.float64)
Tofs_ra = numpy.zeros(nrow, dtype=numpy.float64)
Tofs_dec = numpy.zeros(nrow, dtype=numpy.float64)
Taz = numpy.zeros(nrow, dtype=numpy.float64)
Tel = numpy.zeros(nrow, dtype=numpy.float64)
index = numpy.lexsort((Tant, Tmjd))
LOG.info('Start reading direction (convert if necessary). It may take a while.')
with casa_tools.MSMDReader(name) as msmd:
nprogress = 5000
iprogress = 0
last_mjd = None
last_antenna = None
last_result = None
ref_direction = None
for irow in index:
iprogress += 1
if iprogress >= nprogress and iprogress % nprogress == 0:
print('{}/{}'.format(iprogress, nrow))
row = rows[irow]
mjd_in_sec = Tmjd[irow]
antenna_id = Tant[irow]
if mjd_in_sec == last_mjd and antenna_id == last_antenna:
Taz[irow] = last_result[0]
Tel[irow] = last_result[1]
Tra[irow] = last_result[2]
Tdec[irow] = last_result[3]
Tshift_ra[irow] = last_result[4]
Tshift_dec[irow] = last_result[5]
Tofs_ra[irow] = last_result[6]
Tofs_dec[irow] = last_result[7]
continue
me = casa_tools.measures
qa = casa_tools.quanta
mepoch = me.epoch(rf=time_frame, v0=qa.quantity(mjd_in_sec, 's'))
# now mposition is prepared in mpositions
# antennas = self.ms.get_antenna(antenna_id)
# assert len(antennas) == 1
# antenna_domain = antennas[0]
# mposition = antenna_domain.position
mposition = mpositions[antenna_id]
# CASR-494
try:
pointing_directions = msmd.pointingdirection(row, interpolate=True)
except RuntimeError as e:
LOG.warn(e)
if str(e).find('SSMIndex::getIndex - access to non-existing row') != -1:
LOG.warn('{}: Missing pointing data for row {} (antenna {} time {})'.format(ms.basename, rows[irow], Tant[irow], Tmjd[irow]))
# register DataTable row to self.invalid_pointing_data
dt_row = ID + irow
self.register_invalid_pointing_data(antenna_id, dt_row)
NAN = numpy.nan
Taz[irow] = NAN
Tel[irow] = NAN
Tra[irow] = NAN
Tdec[irow] = NAN
Tshift_ra[irow] = NAN
Tshift_dec[irow] = NAN
Tofs_ra[irow] = NAN
Tofs_dec[irow] = NAN
Tflagrow[irow] = True
continue
else:
raise e
pointing_direction = pointing_directions['antenna1']['pointingdirection'] # antenna2 should be the same
lon = pointing_direction['m0']
lat = pointing_direction['m1']
ref = pointing_direction['refer']
# 2018/04/18 TN
# CAS-10874 single dish pipeline should use ICRS instead of J2000
if ref in [azelref]:
if irow == 0:
LOG.info('Require direction conversion from {0} to {1}'.format(ref, outref))
Taz[irow] = get_value_in_deg(lon)
Tel[irow] = get_value_in_deg(lat)
# conversion to J2000
ra, dec = dirutil.direction_convert(pointing_direction, mepoch, mposition, outframe=outref)
Tra[irow] = get_value_in_deg(ra)
Tdec[irow] = get_value_in_deg(dec)
elif ref in [outref]:
if irow == 0:
LOG.info('Require direction conversion from {0} to {1}'.format(ref, azelref))
Tra[irow] = get_value_in_deg(lon)
Tdec[irow] = get_value_in_deg(lat)
# conversion to AZELGEO
az, el = dirutil.direction_convert(pointing_direction, mepoch, mposition, outframe=azelref)
Taz[irow] = get_value_in_deg(az)
Tel[irow] = get_value_in_deg(el)
else:
if irow == 0:
LOG.info('Require direction conversion from {0} to {1} as well as to {2}'.format(ref, outref,
azelref))
# conversion to J2000
ra, dec = dirutil.direction_convert(pointing_direction, mepoch, mposition, outframe=outref)
Tra[irow] = get_value_in_deg(ra)
Tdec[irow] = get_value_in_deg(dec)
# conversion to AZELGEO
az, el = dirutil.direction_convert(pointing_direction, mepoch, mposition, outframe=azelref)
Taz[irow] = get_value_in_deg(az)
Tel[irow] = get_value_in_deg(el)
# Calculate shift_ra/dec and pack them into Tshift_ra/dec
field_id = field_ids[irow]
if field_id not in ephemsrc_names:
raise RuntimeError("ephemsrc_name for field_id={0} does not exist".format(field_id) )
if ephemsrc_names[field_id] == "":
Tshift_ra[irow] = Tra[irow]
Tshift_dec[irow] = Tdec[irow]
Tofs_ra[irow] = Tra[irow]
Tofs_dec[irow] = Tdec[irow]
else:
source_name = ephemsrc_names[field_id]
if source_name not in org_directions:
raise RuntimeError( "Ephemeris source {0} does not exist in org_directions".format(source_name) )
org_direction = org_directions[source_name]
fields = ms.get_fields( field_id = field_id )
is_known_eph_obj = fields[0].source.is_known_eph_obj
ref_direction = self.get_reference_direction( source_name, ephem_tables[field_id], is_known_eph_obj, mepoch, mposition, outref )
direction2 = me.measure( pointing_direction, outref )
shift_direction = dirutil.direction_shift( direction2, ref_direction, org_direction )
shift_ra, shift_dec = dirutil.direction_convert( shift_direction, mepoch, mposition, outframe=outref )
Tshift_ra[irow] = get_value_in_deg(shift_ra)
Tshift_dec[irow] = get_value_in_deg(shift_dec)
ofs_direction = dirutil.direction_offset( direction2, ref_direction )
ofs_ra, ofs_dec = dirutil.direction_convert( ofs_direction, mepoch, mposition, outframe=outref )
Tofs_ra[irow] = get_value_in_deg(ofs_ra)
Tofs_dec[irow] = get_value_in_deg(ofs_dec)
last_mjd = mjd_in_sec
last_antenna = antenna_id
last_result = (Taz[irow], Tel[irow], Tra[irow], Tdec[irow], Tshift_ra[irow], Tshift_dec[irow], Tofs_ra[irow], Tofs_dec[irow])
# PIPE-646 replace NaN's with nominal value
set_nominal_direction(Tant, Tsrctype, Taz, Tel, Tra, Tdec, Tshift_ra, Tshift_dec, Tofs_ra, Tofs_dec)
assert numpy.all(numpy.isfinite(Taz))
assert numpy.all(numpy.isfinite(Tel))
assert numpy.all(numpy.isfinite(Tra))
assert numpy.all(numpy.isfinite(Tdec))
assert numpy.all(numpy.isfinite(Tshift_ra))
assert numpy.all(numpy.isfinite(Tshift_dec))
assert numpy.all(numpy.isfinite(Tofs_ra))
assert numpy.all(numpy.isfinite(Tofs_dec))
LOG.info('Done reading direction (convert if necessary).')
# save org_directions if exists
if 'org_direction' in locals():
self.datatable.putkeyword( 'ORG_DIRECTION', org_direction )
self.datatable.putcol('RA', Tra, startrow=ID)
self.datatable.putcol('DEC', Tdec, startrow=ID)
self.datatable.putcol('SHIFT_RA', Tshift_ra, startrow=ID)
self.datatable.putcol('SHIFT_DEC', Tshift_dec, startrow=ID)
self.datatable.putcol('OFS_RA', Tofs_ra, startrow=ID)
self.datatable.putcol('OFS_DEC', Tofs_dec, startrow=ID)
self.datatable.putcol('AZ', Taz, startrow=ID)
self.datatable.putcol('EL', Tel, startrow=ID)
self.datatable.putcol('NCHAN', NchanArray, startrow=ID)
self.datatable.putcol('TARGET', Tsrc, startrow=ID)
intArr = numpy.zeros(nrow, dtype=int)
self.datatable.putcol('NMASK', intArr, startrow=ID)
intArr[:] = -1
self.datatable.putcol('NOCHANGE', intArr, startrow=ID)
self.datatable.putcol('POSGRP', intArr, startrow=ID)
self.datatable.putcol('ANTENNA', Tant, startrow=ID)
self.datatable.putcol('SRCTYPE', Tsrctype, startrow=ID)
# row base storing
masklist = ColMaskList.NoMask
# Tsys will be overwritten in applycal stage
tsys_template = numpy.ones(4, dtype=numpy.float32)
flag_summary_template = numpy.ones(4, dtype=numpy.int32)
stats_template = numpy.zeros((4, 7), dtype=numpy.int32) - 1
flags_template = numpy.ones((4, 7), dtype=numpy.int32)
pflags_template = numpy.ones((4, 4), dtype=numpy.int32)
for x in range(nrow):
# FLAGROW is mapped into OnlineFlag (PermanentFlag[3])
# NOTE: data is valid if Tflagrow is 0
# data is valid if pflags[3] is 1
pflags_template[:, OnlineFlagIndex] = 1 if Tflagrow[x] == 0 else 0
sDate = mjd_to_datestring(Tmjd[x]/86400.0, unit='day')
self.datatable.putcell('DATE', ID, sDate)
self.datatable.putcell('MASKLIST', ID, masklist)
# polarization dependent arrays
npol = self.datatable.getcell('NPOL', ID)
self.datatable.putcell('STATISTICS', ID, stats_template[:npol])
self.datatable.putcell('FLAG', ID, flags_template[:npol])
self.datatable.putcell('FLAG_PERMANENT', ID, pflags_template[:npol])
self.datatable.putcell('FLAG_SUMMARY', ID, flag_summary_template[:npol])
self.datatable.putcell('TSYS', ID, tsys_template[:npol])
ID += 1
num_antenna = len(self.ms.antennas)
self.vAnt += num_antenna
self.appended_row = nrow
# PIPE-646 & PIPE-647
# generate flag commands
self.generate_flagcmd()
return org_directions
def _get_outref(self):
outref = None
if self.ms.representative_target[0] is not None:
# if ms has representative target, take reference from that
LOG.info(
'Use direction reference for representative target "{0}".'.format(self.ms.representative_target[0]))
representative_source_name = self.ms.representative_target[0]
dirrefs = numpy.unique([f.mdirection['refer'] for f in self.ms.fields
if f.source.name == representative_source_name])
if len(dirrefs) == 0:
raise RuntimeError('Failed to get direction reference for representative source.')
else:
# if representative target is not given, take reference from one of the targets
dirrefs = numpy.unique([f.mdirection['refer'] for f in self.ms.fields if 'TARGET' in f.intents])
if len(dirrefs) == 0:
# no target field exists, something wrong
raise RuntimeError('No TARGET field exists.')
if len(dirrefs) == 1:
outref = dirrefs[0]
else:
# direction reference is not unique, search desired ref
if 'ICRS' in dirrefs:
outref = 'ICRS'
elif 'J2000' in dirrefs:
outref = 'J2000'
else:
# use first one
outref = dirrefs[0]
if outref is None:
raise RuntimeError('Failed to get direction reference for TARGET.')
return outref
@staticmethod
def _get_azelref():
return 'AZELGEO'
[docs] def get_reference_direction( self, source_name, ephem_table, is_known_eph_obj, mepoch, mposition, outframe):
me = casa_tools.measures
if ephem_table != "":
me.framecomet( ephem_table )
me.doframe(mepoch)
me.doframe(mposition)
obj_azel = me.measure( me.direction('COMET'), 'AZELGEO' )
ref = me.measure( obj_azel, outframe )
else:
# if source_name.upper() in known_ephemeris_list:
if is_known_eph_obj:
me.doframe(mepoch)
me.doframe(mposition)
obj_azel = me.measure( me.direction(source_name.upper()), 'AZELGEO' )
ref = me.measure( obj_azel, outframe )
else:
raise RuntimeError( "{0} is not registered in known_ephemeris_list".format(source_name) )
return ref