Source code for pipeline.hif.cli.gotasks.hif_antpos

##################### generated by xml-casa (v2) from hif_antpos.xml ################
##################### ce31030a08a5f6ddc1d26565d5c6363d ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hif.cli import hif_antpos as _hif_antpos_t
from collections import OrderedDict
import numpy
import sys
import os

import shutil

[docs]def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate
class _hif_antpos: """ hif_antpos ---- Derive an antenna position calibration table Derive the antenna position calibration for list of MeasurementSets. The hif_antpos task corrects the antenna positions recorded in the ASDMs using updated antenna position calibration information determined after the observation was taken. Corrections can be input by hand, read from a file on disk, or in the future by querying an ALMA database service. The antenna positions file is in 'csv' format containing 6 comma-delimited columns as shown below. The default name of this file is 'antennapos.csv'. Contents of example 'antennapos.csv' file: ms,antenna,xoffset,yoffset,zoffset,comment uid___A002_X30a93d_X43e.ms,DV11,0.000,0.010,0.000,"No comment" uid___A002_X30a93d_X43e.dup.ms,DV11,0.000,-0.010,0.000,"No comment" The corrections are used to generate a calibration table which is recorded in the pipeline context and applied to the raw visibility data, on the fly to generate other calibration tables, or permanently to generate calibrated visibilities for imaging. Issues The hm_antpos 'online' option will be implemented when the observing system provides an antenna position determination service. Output results -- If pipeline mode is 'getinputs' then None is returned. Otherwise the results object for the pipeline task is returned. --------- parameter descriptions --------------------------------------------- vis List of input visibility files. Not available when pipelinemode='automatic'. example: ['ngc5921.ms'] caltable Name of output gain calibration tables. Not available when pipelinemode='automatic'. example: ['ngc5921.gcal'] hm_antpos Heuristics method for retrieving the antenna position corrections. The options are 'online' (not yet implemented), 'manual', and 'file'. antenna The list of antennas for which the positions are to be corrected. Available when hm_antpos='manual'. example: antenna='DV05,DV07' offsets The list of antenna offsets for each antenna in 'antennas'. Each offset is a set of 3 floating point numbers separated by commas, specified in the ITRF frame. Available when hm_antpos='manual'. example: offsets=[0.01, 0.02, 0.03, 0.03, 0.02, 0.01] antposfile The file(s) containing the antenna offsets. Used if hm_antpos is 'file'. example: 'antennapos.csv' pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline determines the values of all context defined pipeline inputs automatically. In interactive mode the user can set the pipeline context defined parameters manually. In 'getinputs' mode the user can check the settings of all pipeline parameters without running the task. dryrun Run the task (False) or list commands (True). Available when pipelinemode='interactive'. acceptresults Add the results of the task to the pipeline context (True) or reject them (False). Available when pipelinemode='interactive'. --------- examples ----------------------------------------------------------- 1. Correct the position of antenna 5 for all the visibility files in a single pipeline run: hif_antpos(antenna='DV05', offsets=[0.01, 0.02, 0.03]) 2. Correct the position of antennas for all the visibility files in a single pipeline run using antenna positions files on disk. These files are assumed to conform to a default naming scheme if 'antposfile' is unspecified by the user: hif_antpos(hm_antpos='file', antposfile='myantposfile.csv') """ _info_group_ = """pipeline""" _info_desc_ = """Derive an antenna position calibration table""" __schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'caltable': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'hm_antpos': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'manual', 'file', 'online' ]}, 'antenna': {'type': 'cStr', 'coerce': _coerce.to_str}, 'offsets': {'type': 'cFloatVec', 'coerce': [_coerce.to_list,_coerce.to_floatvec]}, 'antposfile': {'type': 'cStr', 'coerce': _coerce.to_str}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}} def __init__(self): self.__stdout = None self.__stderr = None self.__root_frame_ = None def __globals_(self): if self.__root_frame_ is None: self.__root_frame_ = _find_frame( ) assert self.__root_frame_ is not None, "could not find CASAshell global frame" return self.__root_frame_ def __to_string_(self,value): if type(value) is str: return "'%s'" % value else: return str(value) def __validate_(self,doc,schema): return _pc.validate(doc,schema) def __do_inp_output(self,param_prefix,description_str,formatting_chars): out = self.__stdout or sys.stdout description = description_str.split( ) prefix_width = 23 + 16 + 4 output = [ ] addon = '' first_addon = True while len(description) > 0: ## starting a new line..................................................................... if len(output) == 0: ## for first line add parameter information............................................ if len(param_prefix)-formatting_chars > prefix_width - 1: output.append(param_prefix) continue addon = param_prefix + ' #' first_addon = True addon_formatting = formatting_chars else: ## for subsequent lines space over prefix width........................................ addon = (' ' * prefix_width) + '#' first_addon = False addon_formatting = 0 ## if first word of description puts us over the screen width, bail........................ if len(addon + description[0]) - addon_formatting + 1 > self.term_width: ## if we're doing the first line make sure it's output................................. if first_addon: output.append(addon) break while len(description) > 0: ## if the next description word puts us over break for the next line................... if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break addon = addon + ' ' + description[0] description.pop(0) output.append(addon) out.write('\n'.join(output) + '\n') #--------- return nonsubparam values ---------------------------------------------- def __pipelinemode_dflt( self, glb ): return 'automatic' def __pipelinemode( self, glb ): if 'pipelinemode' in glb: return glb['pipelinemode'] return 'automatic' def __hm_antpos_dflt( self, glb ): return 'manual' def __hm_antpos( self, glb ): if 'hm_antpos' in glb: return glb['hm_antpos'] return 'manual' #--------- return inp/go default -------------------------------------------------- def __antenna_dflt( self, glb ): if self.__hm_antpos( glb ) == "manual": return "" return None def __dryrun_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(False) return None def __antposfile_dflt( self, glb ): if self.__hm_antpos( glb ) == "file": return "" return None def __vis_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __acceptresults_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(True) return None def __offsets_dflt( self, glb ): if self.__hm_antpos( glb ) == "manual": return [] return None def __caltable_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None #--------- return subparam values ------------------------------------------------- def __vis( self, glb ): if 'vis' in glb: return glb['vis'] dflt = self.__vis_dflt( glb ) if dflt is not None: return dflt return [ ] def __caltable( self, glb ): if 'caltable' in glb: return glb['caltable'] dflt = self.__caltable_dflt( glb ) if dflt is not None: return dflt return [ ] def __antenna( self, glb ): if 'antenna' in glb: return glb['antenna'] dflt = self.__antenna_dflt( glb ) if dflt is not None: return dflt return '' def __offsets( self, glb ): if 'offsets' in glb: return glb['offsets'] dflt = self.__offsets_dflt( glb ) if dflt is not None: return dflt return [ ] def __antposfile( self, glb ): if 'antposfile' in glb: return glb['antposfile'] dflt = self.__antposfile_dflt( glb ) if dflt is not None: return dflt return '' def __dryrun( self, glb ): if 'dryrun' in glb: return glb['dryrun'] dflt = self.__dryrun_dflt( glb ) if dflt is not None: return dflt return False def __acceptresults( self, glb ): if 'acceptresults' in glb: return glb['acceptresults'] dflt = self.__acceptresults_dflt( glb ) if dflt is not None: return dflt return True #--------- subparam inp output ---------------------------------------------------- def __vis_inp(self): if self.__vis_dflt( self.__globals_( ) ) is not None: description = 'List of input MeasurementSets' value = self.__vis( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __caltable_inp(self): if self.__caltable_dflt( self.__globals_( ) ) is not None: description = 'List of output caltable(s)' value = self.__caltable( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'caltable': value},{'caltable': self.__schema['caltable']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('caltable',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __hm_antpos_inp(self): description = 'The antenna position determination method' value = self.__hm_antpos( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'hm_antpos': value},{'hm_antpos': self.__schema['hm_antpos']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('hm_antpos',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __antenna_inp(self): if self.__antenna_dflt( self.__globals_( ) ) is not None: description = 'List of antennas to be corrected' value = self.__antenna( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'antenna': value},{'antenna': self.__schema['antenna']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('antenna',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __offsets_inp(self): if self.__offsets_dflt( self.__globals_( ) ) is not None: description = 'List of position corrections, one set per antenna' value = self.__offsets( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'offsets': value},{'offsets': self.__schema['offsets']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('offsets',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __antposfile_inp(self): if self.__antposfile_dflt( self.__globals_( ) ) is not None: description = 'File containing antenna position corrections' value = self.__antposfile( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'antposfile': value},{'antposfile': self.__schema['antposfile']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('antposfile',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __pipelinemode_inp(self): description = 'The pipeline operation mode' value = self.__pipelinemode( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __dryrun_inp(self): if self.__dryrun_dflt( self.__globals_( ) ) is not None: description = 'Run the task (False) or list commands (True)' value = self.__dryrun( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __acceptresults_inp(self): if self.__acceptresults_dflt( self.__globals_( ) ) is not None: description = 'Automatically accept results into context' value = self.__acceptresults( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) #--------- global default implementation------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def set_global_defaults(self): self.set_global_defaults.state['last'] = self glb = self.__globals_( ) if 'antenna' in glb: del glb['antenna'] if 'dryrun' in glb: del glb['dryrun'] if 'pipelinemode' in glb: del glb['pipelinemode'] if 'antposfile' in glb: del glb['antposfile'] if 'hm_antpos' in glb: del glb['hm_antpos'] if 'vis' in glb: del glb['vis'] if 'acceptresults' in glb: del glb['acceptresults'] if 'offsets' in glb: del glb['offsets'] if 'caltable' in glb: del glb['caltable'] #--------- inp function ----------------------------------------------------------- def inp(self): print("# hif_antpos -- %s" % self._info_desc_) self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24)) self.__vis_inp( ) self.__caltable_inp( ) self.__hm_antpos_inp( ) self.__antenna_inp( ) self.__offsets_inp( ) self.__antposfile_inp( ) self.__pipelinemode_inp( ) self.__dryrun_inp( ) self.__acceptresults_inp( ) #--------- tget function ---------------------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def tget(self,file=None): from casashell.private.stack_manip import find_frame from runpy import run_path filename = None if file is None: if os.path.isfile("hif_antpos.last"): filename = "hif_antpos.last" elif isinstance(file, str): if os.path.isfile(file): filename = file if filename is not None: glob = find_frame( ) newglob = run_path( filename, init_globals={ } ) for i in newglob: glob[i] = newglob[i] self.tget.state['last'] = self else: print("could not find last file, setting defaults instead...") self.set_global_defaults( ) def __call__( self, vis=None, caltable=None, hm_antpos=None, antenna=None, offsets=None, antposfile=None, pipelinemode=None, dryrun=None, acceptresults=None ): def noobj(s): if s.startswith('<') and s.endswith('>'): return "None" else: return s _prefile = os.path.realpath('hif_antpos.pre') _postfile = os.path.realpath('hif_antpos.last') _return_result_ = None _arguments = [vis,caltable,hm_antpos,antenna,offsets,antposfile,pipelinemode,dryrun,acceptresults] _invocation_parameters = OrderedDict( ) if any(map(lambda x: x is not None,_arguments)): # invoke python style # set the non sub-parameters that are not None local_global = { } if hm_antpos is not None: local_global['hm_antpos'] = hm_antpos if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode # the invocation parameters for the non-subparameters can now be set - this picks up those defaults _invocation_parameters['hm_antpos'] = self.__hm_antpos( local_global ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global ) # the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default _invocation_parameters['vis'] = self.__vis( _invocation_parameters ) if vis is None else vis _invocation_parameters['caltable'] = self.__caltable( _invocation_parameters ) if caltable is None else caltable _invocation_parameters['antenna'] = self.__antenna( _invocation_parameters ) if antenna is None else antenna _invocation_parameters['offsets'] = self.__offsets( _invocation_parameters ) if offsets is None else offsets _invocation_parameters['antposfile'] = self.__antposfile( _invocation_parameters ) if antposfile is None else antposfile _invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun _invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults else: # invoke with inp/go semantics _invocation_parameters['vis'] = self.__vis( self.__globals_( ) ) _invocation_parameters['caltable'] = self.__caltable( self.__globals_( ) ) _invocation_parameters['hm_antpos'] = self.__hm_antpos( self.__globals_( ) ) _invocation_parameters['antenna'] = self.__antenna( self.__globals_( ) ) _invocation_parameters['offsets'] = self.__offsets( self.__globals_( ) ) _invocation_parameters['antposfile'] = self.__antposfile( self.__globals_( ) ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) ) _invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) ) _invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) ) try: with open(_prefile,'w') as _f: for _i in _invocation_parameters: _f.write("%-13s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i])))) _f.write("#hif_antpos( ") count = 0 for _i in _invocation_parameters: _f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i])))) count += 1 if count < len(_invocation_parameters): _f.write(",") _f.write(" )\n") except: pass try: _return_result_ = _hif_antpos_t( _invocation_parameters['vis'],_invocation_parameters['caltable'],_invocation_parameters['hm_antpos'],_invocation_parameters['antenna'],_invocation_parameters['offsets'],_invocation_parameters['antposfile'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] ) except Exception as e: from traceback import format_exc from casatasks import casalog casalog.origin('hif_antpos') casalog.post("Exception Reported: Error in hif_antpos: %s" % str(e),'SEVERE') casalog.post(format_exc( )) _return_result_ = False try: os.rename(_prefile,_postfile) except: pass return _return_result_ hif_antpos = _hif_antpos( )