Source code for pipeline.hsd.tasks.importdata.reader

import collections
import glob
import itertools
import os
import shutil
import string

import numpy

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.logging as logging
from pipeline.domain.datatable import DataTableImpl as DataTable
from pipeline.domain.datatable import DataTableColumnMaskList as ColMaskList
from pipeline.domain.datatable import OnlineFlagIndex
from pipeline.hsd.tasks.common import mjd_to_datestring, TableSelector
from pipeline.infrastructure import casa_tools

from ..common import rasterutil
from ..common import direction_utils as dirutil

LOG = infrastructure.get_logger(__name__)


[docs]def get_value_in_deg(quantity): qa = casa_tools.quanta return qa.getvalue(qa.convert(quantity, 'deg'))
[docs]def get_state_id(ms, spw, intent): states = (s for s in ms.states if intent in s.intents) obs_modes = set() for s in states: modes = set(s.get_obs_mode_for_intent(intent)) obs_modes.update(modes) state_ids = set() with casa_tools.MSReader(ms.name) as msreader: for obs_mode in obs_modes: msreader.msselect({'spw': spw, 'scanintent': obs_mode}, onlyparse=True) indices = msreader.msselectedindices() state_ids.update(indices['stateid']) # Need to reset explicitly after CASA 5.3. See CAS-11088 for detail. msreader.selectinit(reset=True) return numpy.fromiter(state_ids, dtype=numpy.int32)
[docs]def merge_timerange(timerange_list): timegap_list = numpy.asarray([l1[0] - l0[1] for l0, l1 in zip(timerange_list, timerange_list[1:])]) LOG.info(f'timegap_list is {timegap_list}') # regard timegap <= 0.1msec as continuous gap_index = [-1] + numpy.where(timegap_list > 1e-4)[0].tolist() + [len(timegap_list)] timerange_merged = [[timerange_list[i + 1][0], timerange_list[j][1]] for i, j in zip(gap_index, gap_index[1:])] LOG.info(f'timerange_merged is {timerange_merged}') return timerange_merged
[docs]def initialize_template(flagtemplate): # remove existing template file if os.path.exists(flagtemplate): os.remove(flagtemplate) # initialize template file with header with open(flagtemplate, 'w') as f: f.write('# This template file is auto-generated by hsd_importdata to flag MS rows based on pointing data \n')
[docs]def write_flagcmd(flagtemplate, antenna_id, timerange_list, reason=''): qa = casa_tools.quanta sanitized = reason.replace(' ', '_') template = string.Template(f"mode='manual' antenna='{antenna_id}&&&' timerange='$timerange' reason='SDPL:{sanitized}'\n") with open(flagtemplate, 'a') as f: for timerange in timerange_list: # timerange is [start, end] list of MJD [sec] rangestr = '~'.join(map(lambda t: '{year}/{month}/{monthday}/{hour}:{min}:{s:.7f}'.format(**qa.splitdate(qa.quantity(t, 's'))), timerange)) f.write(template.safe_substitute(timerange=rangestr))
[docs]def set_nominal_direction(ant, srctype, az, el, ra, dec, shift_ra, shift_dec, offset_ra, offset_dec): # check if there are NaN's isvalid = numpy.logical_not(numpy.isnan(az)) if numpy.all(isvalid): # no NaN's so exit this loop print('no NaN') return for _a, _s in itertools.product(set(ant), set(srctype)): # select data print(f'ant {_a} src {_s}') sel = numpy.logical_and(ant == _a, srctype == _s) if numpy.all(numpy.logical_not(sel)): # no data for this selection print('no data') continue # separate NaN's mask = numpy.logical_and(isvalid, sel) if numpy.all(mask[sel]): # no NaN's for this selection print('no NaN for this selection') continue nanmask = numpy.logical_and(numpy.logical_not(isvalid), sel) _az = numpy.median(az[mask]) print('nominal az {}'.format(_az)) _el = numpy.median(el[mask]) print('nominal el {}'.format(_el)) _ra = numpy.median(ra[mask]) print('nominal ra {}'.format(_ra)) _dec = numpy.median(dec[mask]) print('nominal dec {}'.format(_dec)) _shift_ra = numpy.median(shift_ra[mask]) print('nominal shift_ra {}'.format(_shift_ra)) _shift_dec = numpy.median(shift_dec[mask]) print('nominal shift_dec {}'.format(_shift_dec)) _offset_ra = numpy.median(offset_ra[mask]) print('nominal offset_ra {}'.format(_offset_ra)) _offset_dec = numpy.median(offset_dec[mask]) print('nominal offset_dec {}'.format(_offset_dec)) az[nanmask] = _az el[nanmask] = _el ra[nanmask] = _ra dec[nanmask] = _dec shift_ra[nanmask] = _shift_ra shift_dec[nanmask] = _shift_dec offset_ra[nanmask] = _offset_ra offset_dec[nanmask] = _offset_dec
[docs]class MetaDataReader(object): def __init__(self, context, ms, table_name): """ context -- pipeline context mses -- list of measurementset domain objects table_name -- name of DataTable """ self.context = context self.ms = ms self.table_name = table_name # existing table should be the one generated by the previous run # so it must be overwritten if os.path.exists(self.table_name): shutil.rmtree(self.table_name) self.datatable = DataTable(name=self.table_name, readonly=False) self.vAnt = 0 self.appended_row = 0 # invalid_pointing_data is dictionary (collections.defaultdict) # it stores an information of the data that caused # msmd.pointingdirection to fail due to the lack of # pointing data in POINTING table # key: antenna id # value: list of DataTable rows for invalid data self.invalid_pointing_data = collections.defaultdict(list) # remove flag template for pointing ms_prefix = os.path.splitext(self.ms.basename)[0] self.flagtemplate = os.path.join(self.context.output_dir, f'{ms_prefix}.flagpointing.txt') initialize_template(self.flagtemplate) @property def name(self): return self.ms.name
[docs] def get_datatable(self): return self.datatable
[docs] def detect_target_spw(self): if not hasattr(self, 'name'): return [] ms = self.ms spws = ms.get_spectral_windows(science_windows_only=True) return [x.id for x in spws]
[docs] def detect_target_data_desc(self): science_windows = self.detect_target_spw() ms = self.ms def _g(): for spwid in science_windows: dd = ms.get_data_description(spw=spwid) assert dd is not None yield dd.id dds = numpy.fromiter(_g(), dtype=numpy.int32) return dds
[docs] def register_invalid_pointing_data(self, antenna_id, row): self.invalid_pointing_data[antenna_id].append(row)
[docs] def generate_flagcmd(self): # PIPE-646 # per-antenna row list for the data without pointing data # key: antenna id # value: list of rows to be flagged flagdict1 = self.generate_flagdict_for_invalid_pointing_data() # PIPE-647 # per-antenna row list for uniform image rms # key: antenna id # value: list of rows to be flagged flagdict2 = self.generate_flagdict_for_uniform_rms() # do nothing if no rows to be flagged nflag1 = sum(map(len, flagdict1.values())) nflag2 = sum(map(len, flagdict2.values())) if nflag1 + nflag2 == 0: return # for debugging if LOG.isEnabledFor(logging.DEBUG): merged = collections.defaultdict(list) for k, v in itertools.chain(flagdict1.items(), flagdict2.items()): merged[k].extend(v) for aid, dt_rows in merged.items(): with open(f'toflag_ant{aid}.txt', 'w') as f: f.write('\n'.join(map(lambda x: str(self.datatable.getcell('ROW', x)), dt_rows))) f.write('\n') # generate flag command self._generate_flagcmd(self.flagtemplate, flagdict1, reason='missing pointing data') self._generate_flagcmd(self.flagtemplate, flagdict2, reason='uniform image rms')
[docs] def generate_flagdict_for_invalid_pointing_data(self): if len(self.invalid_pointing_data) > 0: LOG.warn('{}: There are rows without corresponding POINTING data'.format(self.ms.basename)) LOG.warn('Affected antennas are: {}'.format(' '.join( [self.ms.antennas[k].name for k in self.invalid_pointing_data]))) return self.invalid_pointing_data
[docs] def generate_flagdict_for_uniform_rms(self): # extract necessary metadata from datatable metadata = rasterutil.read_datatable(self.datatable) # apply flagging heuristics flag_list = rasterutil.flag_raster_map(metadata) # generate dictionary flag_dict = dict(enumerate(flag_list)) return flag_dict
def _generate_flagcmd(self, flagtemplate, flag_dict, reason=''): datatable = self.datatable for antenna_id, rowlist in flag_dict.items(): time_list, sort_index = numpy.unique( [datatable.getcell('TIME', row) for row in rowlist], return_index=True) # day -> sec time_list *= 86400.0 interval_list = [datatable.getcell('EXPOSURE', row) for row in numpy.asarray(rowlist)[sort_index]] LOG.info(f'antenna {antenna_id}: time_list is {time_list}') if len(time_list) == 0: continue timerange_list = [[t - i / 2, t + i / 2] for t, i in zip(time_list, interval_list)] LOG.info(f'antenna {antenna_id}: timerange_list is {timerange_list}') timerange_merged = merge_timerange(timerange_list) write_flagcmd(flagtemplate, antenna_id, timerange_merged, reason)
[docs] def execute(self, dry_run=True): if dry_run: return # name of the MS name = self.name spwids = self.detect_target_spw() nchan_map = dict([(spwid, self.ms.get_spectral_window(spwid).num_channels) for spwid in spwids]) ddids = self.detect_target_data_desc() #Rad2Deg = 180. / 3.141592653 # FILENAME keyword stores name of the MS LOG.info('name=%s' % name) self.datatable.putkeyword('FILENAME', name) # 2018/04/18 TN # CAS-10874 single dish pipeline should use ICRS instead of J2000 # For consistency throuout the single dish pipeline, direction # reference frame is stored in the DataTable outref = self._get_outref() # register direction reference to datatable self.datatable.direction_ref = outref azelref = self._get_azelref() #LOG.info('outref="{0}" azelref="{1}"'.format(outref, azelref)) ms = self.ms assert ms is not None spwsel = ','.join(map(str, spwids)) target = 'TARGET' reference = 'REFERENCE' assert target in ms.intents #assert reference in ms.intents target_states = get_state_id(ms, spwsel, target) reference_states = get_state_id(ms, spwsel, reference) # target_states = (s for s in ms.states if target in s.intents) # reference_states = (s for s in ms.states if reference in s.intents) # target_obs_modes = set() # for s in target_states: # modes = set(s.get_obs_mode_for_intent(target)) # target_obs_modes.update(modes) # modes = set(s.get_obs_mode_for_intent(reference)) # obs_modes.update(modes) # state_ids = set() # with casa_tools.MSReader(name) as msreader: # for obs_mode in obs_modes: # msreader.msselect({'spw': spwsel, 'scanintent': obs_mode}, onlyparse=True) # indices = msreader.msselectedindices() # state_ids.update(indices['stateid']) # state_ids = numpy.fromiter(state_ids, dtype=numpy.int32) state_ids = numpy.concatenate([target_states, reference_states]) target_state_ids = numpy.concatenate([target_states]) # get antenna position list mpositions = [ a.position for a in ms.antennas ] # get names of ephemeris sources (excludes 'COMET') me = casa_tools.measures direction_codes = me.listcodes( me.direction() ) ephemeris_list = direction_codes['extra'] known_ephemeris_list = numpy.delete( ephemeris_list, numpy.where(ephemeris_list=='COMET') ) # set org_directions ephemsrc_list = [] # list of ephemsrc names (unique appearance) ephemsrc_names = {} # ephemsrc name for each field_id ephem_tables = {} # epheris table name for each field_id if applicasble # ephem_tables to be "" for known_ephemeris_list and non-ephemeris sources with casa_tools.TableReader(os.path.join(name, 'FIELD')) as tb: field_ids = list(range(tb.nrows())) for field_id in list(set(field_ids)): fields = ms.get_fields( field_id = field_id ) source_name = (fields[0].source.name) # removed upper() 2019.5.14 if 'EPHEMERIS_ID' in tb.colnames(): ephemeris_ids = tb.getcol( 'EPHEMERIS_ID' ) else: ephemeris_ids = [] # check if known_ephemeris_sources if source_name.upper() in known_ephemeris_list and not fields[0].source.is_eph_obj: fields[0].source.is_known_eph_obj = True else: fields[0].source.is_known_eph_obj = False # ephemeris source with ephemeris table if fields[0].source.is_eph_obj: ephemsrc_names.update( { field_id:source_name } ) if source_name.upper not in ephemsrc_list: # found a new ephemeris source ephemsrc_list.append( source_name ) # pick ephemeris table name ephem_table_files = glob.glob( ms.name+'/FIELD/EPHEM'+str(ephemeris_ids[field_id])+'_*.tab' ) if len(ephem_table_files) > 1: raise RuntimeError( "multiple ephemeris tables found for field_id={}".format(field_id) ) if len(ephem_table_files) == 1: ephem_table_file = ephem_table_files[0] # double check the source name in ephem_table_file with casa_tools.TableReader(ephem_table_file) as tb2: keywords = tb2.getkeywords() if keywords['NAME'] != source_name: raise RuntimeError( "source name in ephemeris table {0} was {1}, inconsistent with {2}".format( ephem_table_file, keywords['NAME'], source_name ) ) ephem_tables.update( {field_id:ephem_table_file} ) LOG.info( "FIELD_ID={} ({}) with ephemeris table {}".format( field_id, source_name, ephem_table_file ) ) # known ephemeris source without ephemeirs table (not applicable for ALMA) elif fields[0].source.is_known_eph_obj: ephemsrc_names.update( { field_id:source_name } ) ephem_tables.update( {field_id:'' } ) LOG.info( "FIELD_ID={} ({}) as KNOWN EPHEMERIS SOURCE".format( field_id, source_name ) ) # non-ephemeris source else: ephemsrc_names.update( { field_id:'' } ) ephem_tables.update( {field_id:'' } ) LOG.info( "FIELD_ID={} ({}) as NORMAL SOURCE".format( field_id, source_name) ) with TableSelector(name, 'ANTENNA1 == ANTENNA2 && FEED1 == FEED2 && DATA_DESC_ID IN %s && STATE_ID IN %s'%(list(ddids), list(target_state_ids))) as tb: # find the first onsrc for each ephemeris source and pack org_directions org_directions = {} nrow = tb.nrows() for irow in range(nrow): field_id = tb.getcell( 'FIELD_ID', irow ) if field_id not in ephemsrc_names: raise RuntimeError( "ephemsrc_name for field_id={0} does not exist".format(field_id) ) if ephemsrc_names[field_id] != "": source_name = ephemsrc_names[field_id] if source_name not in org_directions: mjd_in_sec = tb.getcell( 'TIME', irow ) antenna_id = tb.getcell( 'ANTENNA1', irow ) time_meas = tb.getcolkeyword( 'TIME', 'MEASINFO' ) time_frame = time_meas['Ref'] me = casa_tools.measures qa = casa_tools.quanta mepoch = me.epoch(rf=time_frame, v0=qa.quantity(mjd_in_sec, 's')) antennas = self.ms.get_antenna(antenna_id) assert len(antennas) == 1 antenna_domain = antennas[0] mposition = antenna_domain.position fields = ms.get_fields( field_id = field_id ) is_known_eph_obj = fields[0].source.is_known_eph_obj org_direction = self.get_reference_direction( source_name, ephem_tables[field_id], is_known_eph_obj, mepoch, mposition, outref ) org_directions.update( {source_name:org_direction} ); with casa_tools.TableReader(os.path.join(name, 'FIELD')) as tb: field_ids = list(range(tb.nrows())) for field_id in list(set(field_ids)): fields = ms.get_fields( field_id = field_id ) source_name = fields[0].source.name if source_name in org_directions: fields[0].source.org_direction = org_directions[source_name] LOG.info( "registering org_direction[{}] (field_id={} of {}) as {}".format( source_name, field_id, name, org_directions[source_name] )) else: org_direction = None with TableSelector(name, 'ANTENNA1 == ANTENNA2 && FEED1 == FEED2 && DATA_DESC_ID IN %s && STATE_ID IN %s'%(list(ddids), list(state_ids))) as tb: nrow = tb.nrows() rows = tb.rownumbers() Texpt = tb.getcol('INTERVAL') Tmjd = tb.getcol('TIME') time_meas = tb.getcolkeyword('TIME', 'MEASINFO') time_frame = time_meas['Ref'] Tscan = tb.getcol('SCAN_NUMBER') TDD = tb.getcol('DATA_DESC_ID') ddspwmap = numpy.vectorize(lambda x: ms.get_data_description(id=x).spw.id, otypes=[numpy.int32]) ddnpolmap = numpy.vectorize(lambda x: ms.get_data_description(id=x).num_polarizations, otypes=[numpy.int32]) Tif = ddspwmap(TDD) Tpol = ddnpolmap(TDD) Tant = tb.getcol('ANTENNA1') Tbeam = tb.getcol('FEED1') Tsrctype = numpy.fromiter((0 if i in target_states else 1 for i in tb.getcol('STATE_ID')), dtype=numpy.int32) Tflagrow = tb.getcol('FLAG_ROW') Tflag = numpy.fromiter((numpy.all(tb.getcell('FLAG', i) == True) for i in range(nrow)), dtype=bool) Tflagrow = numpy.logical_or(Tflagrow, Tflag) field_ids = tb.getcol('FIELD_ID') getsourcename = numpy.vectorize(lambda x: ms.get_fields(x)[0].source.name, otypes=['str']) Tsrc = getsourcename(field_ids) NchanArray = numpy.fromiter((nchan_map[n] for n in Tif), dtype=numpy.int) ID = len(self.datatable) LOG.info('ID=%s' % ID) #ROWs = [] #IDs = [] self.datatable.addrows(nrow) # column based storing self.datatable.putcol('ROW', rows, startrow=ID) self.datatable.putcol('SCAN', Tscan, startrow=ID) self.datatable.putcol('IF', Tif, startrow=ID) self.datatable.putcol('NPOL', Tpol, startrow=ID) self.datatable.putcol('BEAM', Tbeam, startrow=ID) self.datatable.putcol('TIME', Tmjd/86400.0, startrow=ID) self.datatable.putcol('ELAPSED', Tmjd-Tmjd[0], startrow=ID) self.datatable.putcol('EXPOSURE', Texpt, startrow=ID) self.datatable.putcol('FIELD_ID', field_ids, startrow=ID) Tra = numpy.zeros(nrow, dtype=numpy.float64) Tdec = numpy.zeros(nrow, dtype=numpy.float64) Tshift_ra = numpy.zeros(nrow, dtype=numpy.float64) Tshift_dec = numpy.zeros(nrow, dtype=numpy.float64) Tofs_ra = numpy.zeros(nrow, dtype=numpy.float64) Tofs_dec = numpy.zeros(nrow, dtype=numpy.float64) Taz = numpy.zeros(nrow, dtype=numpy.float64) Tel = numpy.zeros(nrow, dtype=numpy.float64) index = numpy.lexsort((Tant, Tmjd)) LOG.info('Start reading direction (convert if necessary). It may take a while.') with casa_tools.MSMDReader(name) as msmd: nprogress = 5000 iprogress = 0 last_mjd = None last_antenna = None last_result = None ref_direction = None for irow in index: iprogress += 1 if iprogress >= nprogress and iprogress % nprogress == 0: print('{}/{}'.format(iprogress, nrow)) row = rows[irow] mjd_in_sec = Tmjd[irow] antenna_id = Tant[irow] if mjd_in_sec == last_mjd and antenna_id == last_antenna: Taz[irow] = last_result[0] Tel[irow] = last_result[1] Tra[irow] = last_result[2] Tdec[irow] = last_result[3] Tshift_ra[irow] = last_result[4] Tshift_dec[irow] = last_result[5] Tofs_ra[irow] = last_result[6] Tofs_dec[irow] = last_result[7] continue me = casa_tools.measures qa = casa_tools.quanta mepoch = me.epoch(rf=time_frame, v0=qa.quantity(mjd_in_sec, 's')) # now mposition is prepared in mpositions # antennas = self.ms.get_antenna(antenna_id) # assert len(antennas) == 1 # antenna_domain = antennas[0] # mposition = antenna_domain.position mposition = mpositions[antenna_id] # CASR-494 try: pointing_directions = msmd.pointingdirection(row, interpolate=True) except RuntimeError as e: LOG.warn(e) if str(e).find('SSMIndex::getIndex - access to non-existing row') != -1: LOG.warn('{}: Missing pointing data for row {} (antenna {} time {})'.format(ms.basename, rows[irow], Tant[irow], Tmjd[irow])) # register DataTable row to self.invalid_pointing_data dt_row = ID + irow self.register_invalid_pointing_data(antenna_id, dt_row) NAN = numpy.nan Taz[irow] = NAN Tel[irow] = NAN Tra[irow] = NAN Tdec[irow] = NAN Tshift_ra[irow] = NAN Tshift_dec[irow] = NAN Tofs_ra[irow] = NAN Tofs_dec[irow] = NAN Tflagrow[irow] = True continue else: raise e pointing_direction = pointing_directions['antenna1']['pointingdirection'] # antenna2 should be the same lon = pointing_direction['m0'] lat = pointing_direction['m1'] ref = pointing_direction['refer'] # 2018/04/18 TN # CAS-10874 single dish pipeline should use ICRS instead of J2000 if ref in [azelref]: if irow == 0: LOG.info('Require direction conversion from {0} to {1}'.format(ref, outref)) Taz[irow] = get_value_in_deg(lon) Tel[irow] = get_value_in_deg(lat) # conversion to J2000 ra, dec = dirutil.direction_convert(pointing_direction, mepoch, mposition, outframe=outref) Tra[irow] = get_value_in_deg(ra) Tdec[irow] = get_value_in_deg(dec) elif ref in [outref]: if irow == 0: LOG.info('Require direction conversion from {0} to {1}'.format(ref, azelref)) Tra[irow] = get_value_in_deg(lon) Tdec[irow] = get_value_in_deg(lat) # conversion to AZELGEO az, el = dirutil.direction_convert(pointing_direction, mepoch, mposition, outframe=azelref) Taz[irow] = get_value_in_deg(az) Tel[irow] = get_value_in_deg(el) else: if irow == 0: LOG.info('Require direction conversion from {0} to {1} as well as to {2}'.format(ref, outref, azelref)) # conversion to J2000 ra, dec = dirutil.direction_convert(pointing_direction, mepoch, mposition, outframe=outref) Tra[irow] = get_value_in_deg(ra) Tdec[irow] = get_value_in_deg(dec) # conversion to AZELGEO az, el = dirutil.direction_convert(pointing_direction, mepoch, mposition, outframe=azelref) Taz[irow] = get_value_in_deg(az) Tel[irow] = get_value_in_deg(el) # Calculate shift_ra/dec and pack them into Tshift_ra/dec field_id = field_ids[irow] if field_id not in ephemsrc_names: raise RuntimeError("ephemsrc_name for field_id={0} does not exist".format(field_id) ) if ephemsrc_names[field_id] == "": Tshift_ra[irow] = Tra[irow] Tshift_dec[irow] = Tdec[irow] Tofs_ra[irow] = Tra[irow] Tofs_dec[irow] = Tdec[irow] else: source_name = ephemsrc_names[field_id] if source_name not in org_directions: raise RuntimeError( "Ephemeris source {0} does not exist in org_directions".format(source_name) ) org_direction = org_directions[source_name] fields = ms.get_fields( field_id = field_id ) is_known_eph_obj = fields[0].source.is_known_eph_obj ref_direction = self.get_reference_direction( source_name, ephem_tables[field_id], is_known_eph_obj, mepoch, mposition, outref ) direction2 = me.measure( pointing_direction, outref ) shift_direction = dirutil.direction_shift( direction2, ref_direction, org_direction ) shift_ra, shift_dec = dirutil.direction_convert( shift_direction, mepoch, mposition, outframe=outref ) Tshift_ra[irow] = get_value_in_deg(shift_ra) Tshift_dec[irow] = get_value_in_deg(shift_dec) ofs_direction = dirutil.direction_offset( direction2, ref_direction ) ofs_ra, ofs_dec = dirutil.direction_convert( ofs_direction, mepoch, mposition, outframe=outref ) Tofs_ra[irow] = get_value_in_deg(ofs_ra) Tofs_dec[irow] = get_value_in_deg(ofs_dec) last_mjd = mjd_in_sec last_antenna = antenna_id last_result = (Taz[irow], Tel[irow], Tra[irow], Tdec[irow], Tshift_ra[irow], Tshift_dec[irow], Tofs_ra[irow], Tofs_dec[irow]) # PIPE-646 replace NaN's with nominal value set_nominal_direction(Tant, Tsrctype, Taz, Tel, Tra, Tdec, Tshift_ra, Tshift_dec, Tofs_ra, Tofs_dec) assert numpy.all(numpy.isfinite(Taz)) assert numpy.all(numpy.isfinite(Tel)) assert numpy.all(numpy.isfinite(Tra)) assert numpy.all(numpy.isfinite(Tdec)) assert numpy.all(numpy.isfinite(Tshift_ra)) assert numpy.all(numpy.isfinite(Tshift_dec)) assert numpy.all(numpy.isfinite(Tofs_ra)) assert numpy.all(numpy.isfinite(Tofs_dec)) LOG.info('Done reading direction (convert if necessary).') # save org_directions if exists if 'org_direction' in locals(): self.datatable.putkeyword( 'ORG_DIRECTION', org_direction ) self.datatable.putcol('RA', Tra, startrow=ID) self.datatable.putcol('DEC', Tdec, startrow=ID) self.datatable.putcol('SHIFT_RA', Tshift_ra, startrow=ID) self.datatable.putcol('SHIFT_DEC', Tshift_dec, startrow=ID) self.datatable.putcol('OFS_RA', Tofs_ra, startrow=ID) self.datatable.putcol('OFS_DEC', Tofs_dec, startrow=ID) self.datatable.putcol('AZ', Taz, startrow=ID) self.datatable.putcol('EL', Tel, startrow=ID) self.datatable.putcol('NCHAN', NchanArray, startrow=ID) self.datatable.putcol('TARGET', Tsrc, startrow=ID) intArr = numpy.zeros(nrow, dtype=int) self.datatable.putcol('NMASK', intArr, startrow=ID) intArr[:] = -1 self.datatable.putcol('NOCHANGE', intArr, startrow=ID) self.datatable.putcol('POSGRP', intArr, startrow=ID) self.datatable.putcol('ANTENNA', Tant, startrow=ID) self.datatable.putcol('SRCTYPE', Tsrctype, startrow=ID) # row base storing masklist = ColMaskList.NoMask # Tsys will be overwritten in applycal stage tsys_template = numpy.ones(4, dtype=numpy.float32) flag_summary_template = numpy.ones(4, dtype=numpy.int32) stats_template = numpy.zeros((4, 7), dtype=numpy.int32) - 1 flags_template = numpy.ones((4, 7), dtype=numpy.int32) pflags_template = numpy.ones((4, 4), dtype=numpy.int32) for x in range(nrow): # FLAGROW is mapped into OnlineFlag (PermanentFlag[3]) # NOTE: data is valid if Tflagrow is 0 # data is valid if pflags[3] is 1 pflags_template[:, OnlineFlagIndex] = 1 if Tflagrow[x] == 0 else 0 sDate = mjd_to_datestring(Tmjd[x]/86400.0, unit='day') self.datatable.putcell('DATE', ID, sDate) self.datatable.putcell('MASKLIST', ID, masklist) # polarization dependent arrays npol = self.datatable.getcell('NPOL', ID) self.datatable.putcell('STATISTICS', ID, stats_template[:npol]) self.datatable.putcell('FLAG', ID, flags_template[:npol]) self.datatable.putcell('FLAG_PERMANENT', ID, pflags_template[:npol]) self.datatable.putcell('FLAG_SUMMARY', ID, flag_summary_template[:npol]) self.datatable.putcell('TSYS', ID, tsys_template[:npol]) ID += 1 num_antenna = len(self.ms.antennas) self.vAnt += num_antenna self.appended_row = nrow # PIPE-646 & PIPE-647 # generate flag commands self.generate_flagcmd() return org_directions
def _get_outref(self): outref = None if self.ms.representative_target[0] is not None: # if ms has representative target, take reference from that LOG.info( 'Use direction reference for representative target "{0}".'.format(self.ms.representative_target[0])) representative_source_name = self.ms.representative_target[0] dirrefs = numpy.unique([f.mdirection['refer'] for f in self.ms.fields if f.source.name == representative_source_name]) if len(dirrefs) == 0: raise RuntimeError('Failed to get direction reference for representative source.') else: # if representative target is not given, take reference from one of the targets dirrefs = numpy.unique([f.mdirection['refer'] for f in self.ms.fields if 'TARGET' in f.intents]) if len(dirrefs) == 0: # no target field exists, something wrong raise RuntimeError('No TARGET field exists.') if len(dirrefs) == 1: outref = dirrefs[0] else: # direction reference is not unique, search desired ref if 'ICRS' in dirrefs: outref = 'ICRS' elif 'J2000' in dirrefs: outref = 'J2000' else: # use first one outref = dirrefs[0] if outref is None: raise RuntimeError('Failed to get direction reference for TARGET.') return outref @staticmethod def _get_azelref(): return 'AZELGEO'
[docs] def get_reference_direction( self, source_name, ephem_table, is_known_eph_obj, mepoch, mposition, outframe): me = casa_tools.measures if ephem_table != "": me.framecomet( ephem_table ) me.doframe(mepoch) me.doframe(mposition) obj_azel = me.measure( me.direction('COMET'), 'AZELGEO' ) ref = me.measure( obj_azel, outframe ) else: # if source_name.upper() in known_ephemeris_list: if is_known_eph_obj: me.doframe(mepoch) me.doframe(mposition) obj_azel = me.measure( me.direction(source_name.upper()), 'AZELGEO' ) ref = me.measure( obj_azel, outframe ) else: raise RuntimeError( "{0} is not registered in known_ephemeris_list".format(source_name) ) return ref