Source code for pipeline.hif.cli.gotasks.hpc_hif_refant

##################### generated by xml-casa (v2) from hpc_hif_refant.xml ############
##################### 9ed0f0f5cd1e23734390ccb652efc11f ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hif.cli import hpc_hif_refant as _hpc_hif_refant_t
from collections import OrderedDict
import numpy
import sys
import os

import shutil

[docs]def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate
class _hpc_hif_refant: """ hpc_hif_refant ---- Select the best reference antennas The hpc_hif_refant task selects a list of reference antennas and stores them in the pipeline context in priority order. The priority order is determined by a weighted combination of scores derived by the antenna selection heuristics. In manual mode the reference antennas can be set by hand. Output: results -- If pipeline mode is 'getinputs' then None is returned. Otherwise the results object for the pipeline task is returned. Issues --------- parameter descriptions --------------------------------------------- vis The list of input MeasurementSets. Defaults to the list of MeasurementSets in the pipeline context. Not available when pipelinemode='automatic'. example: ['M31.ms'] field The comma delimited list of field names or field ids for which flagging scores are computed if hm_refant='automatic' and flagging = True. Not available when pipelinemode='automatic'. example: '' (Default to fields with the specified intents), '3C279', '3C279,M82' intent A string containing a comma delimited list of intents against which the selected fields are matched. Defaults to all supported intents. Not available when pipelinemode='automatic'. example: 'BANDPASS', 'AMPLI,BANDPASS,PHASE' spw A string containing the comma delimited list of spectral window ids for which flagging scores are computed if hm_refant='automatic' and flagging = True. Not available when pipelinemode='automatic'. example: '' (all spws observed with the specified intents), '11,13,15,17' hm_refant The heuristics method or mode for selection the reference antenna. The options are 'manual' and 'automatic. In manual mode a user supplied referenence antenna refant is supplied. In 'automatic' mode the antennas are selected automatically. refant The user supplied reference antenna for hm_refant='manual. If no antenna list is supplied an empty list is returned. example: 'DV05' refantignore string list to be ignored as reference antennas. example: refantignore='ea02,ea03' geometry Score antenna by proximity to the center of the array. This option is quick as only the ANTENNA table must be read. Parameter is available when hm_refant='automatic'. flagging Score antennas by percentage of unflagged data. This option requires computing flagging statistics. Parameter is available when hm_refant='automatic'. pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline determines the values of all context defined pipeline inputs automatically. In interactive mode the user can set the pipeline context defined parameters manually. In 'getinputs' mode the user can check the settings of all pipeline parameters without running the task. dryrun Run the task (False) or display the command (True). Available when pipelinemode='interactive'. acceptresults Add the results of the task to the pipeline context (True) or reject them (False). Available when pipelinemode='interactive'. parallel Execute using CASA HPC functionality, if available. --------- examples ----------------------------------------------------------- 1. Compute the references antennas to be used for bandpass and gain calibration. hpc_hif_refant() """ _info_group_ = """pipeline""" _info_desc_ = """Select the best reference antennas""" __schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'field': {'type': 'cStr', 'coerce': _coerce.to_str}, 'intent': {'type': 'cStr', 'coerce': _coerce.to_str}, 'spw': {'type': 'cStr', 'coerce': _coerce.to_str}, 'hm_refant': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'manual' ]}, 'refant': {'type': 'cStr', 'coerce': _coerce.to_str}, 'refantignore': {'type': 'cStr', 'coerce': _coerce.to_str}, 'geometry': {'type': 'cBool'}, 'flagging': {'type': 'cBool'}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}, 'parallel': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'true', 'false' ]}} def __init__(self): self.__stdout = None self.__stderr = None self.__root_frame_ = None def __globals_(self): if self.__root_frame_ is None: self.__root_frame_ = _find_frame( ) assert self.__root_frame_ is not None, "could not find CASAshell global frame" return self.__root_frame_ def __to_string_(self,value): if type(value) is str: return "'%s'" % value else: return str(value) def __validate_(self,doc,schema): return _pc.validate(doc,schema) def __do_inp_output(self,param_prefix,description_str,formatting_chars): out = self.__stdout or sys.stdout description = description_str.split( ) prefix_width = 23 + 16 + 4 output = [ ] addon = '' first_addon = True while len(description) > 0: ## starting a new line..................................................................... if len(output) == 0: ## for first line add parameter information............................................ if len(param_prefix)-formatting_chars > prefix_width - 1: output.append(param_prefix) continue addon = param_prefix + ' #' first_addon = True addon_formatting = formatting_chars else: ## for subsequent lines space over prefix width........................................ addon = (' ' * prefix_width) + '#' first_addon = False addon_formatting = 0 ## if first word of description puts us over the screen width, bail........................ if len(addon + description[0]) - addon_formatting + 1 > self.term_width: ## if we're doing the first line make sure it's output................................. if first_addon: output.append(addon) break while len(description) > 0: ## if the next description word puts us over break for the next line................... if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break addon = addon + ' ' + description[0] description.pop(0) output.append(addon) out.write('\n'.join(output) + '\n') #--------- return nonsubparam values ---------------------------------------------- def __intent_dflt( self, glb ): return 'AMPLITUDE,BANDPASS,PHASE' def __intent( self, glb ): if 'intent' in glb: return glb['intent'] return 'AMPLITUDE,BANDPASS,PHASE' def __refantignore_dflt( self, glb ): return '' def __refantignore( self, glb ): if 'refantignore' in glb: return glb['refantignore'] return '' def __pipelinemode_dflt( self, glb ): return 'automatic' def __pipelinemode( self, glb ): if 'pipelinemode' in glb: return glb['pipelinemode'] return 'automatic' def __hm_refant_dflt( self, glb ): return 'automatic' def __hm_refant( self, glb ): if 'hm_refant' in glb: return glb['hm_refant'] return 'automatic' #--------- return inp/go default -------------------------------------------------- def __dryrun_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(False) return None def __flagging_dflt( self, glb ): if self.__hm_refant( glb ) == "automatic": return bool(True) return None def __field_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __refant_dflt( self, glb ): if self.__hm_refant( glb ) == "manual": return "" return None def __vis_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __acceptresults_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(True) return None def __spw_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __parallel_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "automatic" return None def __geometry_dflt( self, glb ): if self.__hm_refant( glb ) == "automatic": return bool(True) return None #--------- return subparam values ------------------------------------------------- def __vis( self, glb ): if 'vis' in glb: return glb['vis'] dflt = self.__vis_dflt( glb ) if dflt is not None: return dflt return [ ] def __field( self, glb ): if 'field' in glb: return glb['field'] dflt = self.__field_dflt( glb ) if dflt is not None: return dflt return '' def __intent( self, glb ): if 'intent' in glb: return glb['intent'] dflt = self.__intent_dflt( glb ) if dflt is not None: return dflt return 'AMPLITUDE,BANDPASS,PHASE' def __spw( self, glb ): if 'spw' in glb: return glb['spw'] dflt = self.__spw_dflt( glb ) if dflt is not None: return dflt return '' def __refant( self, glb ): if 'refant' in glb: return glb['refant'] dflt = self.__refant_dflt( glb ) if dflt is not None: return dflt return '' def __refantignore( self, glb ): if 'refantignore' in glb: return glb['refantignore'] dflt = self.__refantignore_dflt( glb ) if dflt is not None: return dflt return '' def __geometry( self, glb ): if 'geometry' in glb: return glb['geometry'] dflt = self.__geometry_dflt( glb ) if dflt is not None: return dflt return True def __flagging( self, glb ): if 'flagging' in glb: return glb['flagging'] dflt = self.__flagging_dflt( glb ) if dflt is not None: return dflt return True def __dryrun( self, glb ): if 'dryrun' in glb: return glb['dryrun'] dflt = self.__dryrun_dflt( glb ) if dflt is not None: return dflt return False def __acceptresults( self, glb ): if 'acceptresults' in glb: return glb['acceptresults'] dflt = self.__acceptresults_dflt( glb ) if dflt is not None: return dflt return True def __parallel( self, glb ): if 'parallel' in glb: return glb['parallel'] dflt = self.__parallel_dflt( glb ) if dflt is not None: return dflt return 'automatic' #--------- subparam inp output ---------------------------------------------------- def __vis_inp(self): if self.__vis_dflt( self.__globals_( ) ) is not None: description = 'List of input MeasurementSets' value = self.__vis( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __field_inp(self): if self.__field_dflt( self.__globals_( ) ) is not None: description = 'List of field names or ids' value = self.__field( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'field': value},{'field': self.__schema['field']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('field',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __intent_inp(self): if self.__intent_dflt( self.__globals_( ) ) is not None: description = 'List of data selection intents' value = self.__intent( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'intent': value},{'intent': self.__schema['intent']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('intent',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __spw_inp(self): if self.__spw_dflt( self.__globals_( ) ) is not None: description = 'List of spectral windows ids' value = self.__spw( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'spw': value},{'spw': self.__schema['spw']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('spw',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __hm_refant_inp(self): description = 'The reference antenna heuristics mode' value = self.__hm_refant( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'hm_refant': value},{'hm_refant': self.__schema['hm_refant']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('hm_refant',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __refant_inp(self): if self.__refant_dflt( self.__globals_( ) ) is not None: description = 'List of reference antennas' value = self.__refant( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'refant': value},{'refant': self.__schema['refant']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('refant',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __refantignore_inp(self): if self.__refantignore_dflt( self.__globals_( ) ) is not None: description = 'String list of antennas to ignore' value = self.__refantignore( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'refantignore': value},{'refantignore': self.__schema['refantignore']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('refantignore',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __geometry_inp(self): if self.__geometry_dflt( self.__globals_( ) ) is not None: description = 'Score by proximity to center of the array' value = self.__geometry( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'geometry': value},{'geometry': self.__schema['geometry']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('geometry',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __flagging_inp(self): if self.__flagging_dflt( self.__globals_( ) ) is not None: description = 'Score by percentage of good data' value = self.__flagging( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'flagging': value},{'flagging': self.__schema['flagging']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('flagging',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __pipelinemode_inp(self): description = 'The pipeline operating mode' value = self.__pipelinemode( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __dryrun_inp(self): if self.__dryrun_dflt( self.__globals_( ) ) is not None: description = 'Run the task (False) or display the command (True)' value = self.__dryrun( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __acceptresults_inp(self): if self.__acceptresults_dflt( self.__globals_( ) ) is not None: description = 'Add the results into the pipeline context' value = self.__acceptresults( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __parallel_inp(self): if self.__parallel_dflt( self.__globals_( ) ) is not None: description = '' value = self.__parallel( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'parallel': value},{'parallel': self.__schema['parallel']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('parallel',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) #--------- global default implementation------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def set_global_defaults(self): self.set_global_defaults.state['last'] = self glb = self.__globals_( ) if 'dryrun' in glb: del glb['dryrun'] if 'refantignore' in glb: del glb['refantignore'] if 'flagging' in glb: del glb['flagging'] if 'field' in glb: del glb['field'] if 'pipelinemode' in glb: del glb['pipelinemode'] if 'intent' in glb: del glb['intent'] if 'refant' in glb: del glb['refant'] if 'vis' in glb: del glb['vis'] if 'acceptresults' in glb: del glb['acceptresults'] if 'hm_refant' in glb: del glb['hm_refant'] if 'spw' in glb: del glb['spw'] if 'parallel' in glb: del glb['parallel'] if 'geometry' in glb: del glb['geometry'] #--------- inp function ----------------------------------------------------------- def inp(self): print("# hpc_hif_refant -- %s" % self._info_desc_) self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24)) self.__vis_inp( ) self.__field_inp( ) self.__intent_inp( ) self.__spw_inp( ) self.__hm_refant_inp( ) self.__refant_inp( ) self.__refantignore_inp( ) self.__geometry_inp( ) self.__flagging_inp( ) self.__pipelinemode_inp( ) self.__dryrun_inp( ) self.__acceptresults_inp( ) self.__parallel_inp( ) #--------- tget function ---------------------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def tget(self,file=None): from casashell.private.stack_manip import find_frame from runpy import run_path filename = None if file is None: if os.path.isfile("hpc_hif_refant.last"): filename = "hpc_hif_refant.last" elif isinstance(file, str): if os.path.isfile(file): filename = file if filename is not None: glob = find_frame( ) newglob = run_path( filename, init_globals={ } ) for i in newglob: glob[i] = newglob[i] self.tget.state['last'] = self else: print("could not find last file, setting defaults instead...") self.set_global_defaults( ) def __call__( self, vis=None, field=None, intent=None, spw=None, hm_refant=None, refant=None, refantignore=None, geometry=None, flagging=None, pipelinemode=None, dryrun=None, acceptresults=None, parallel=None ): def noobj(s): if s.startswith('<') and s.endswith('>'): return "None" else: return s _prefile = os.path.realpath('hpc_hif_refant.pre') _postfile = os.path.realpath('hpc_hif_refant.last') _return_result_ = None _arguments = [vis,field,intent,spw,hm_refant,refant,refantignore,geometry,flagging,pipelinemode,dryrun,acceptresults,parallel] _invocation_parameters = OrderedDict( ) if any(map(lambda x: x is not None,_arguments)): # invoke python style # set the non sub-parameters that are not None local_global = { } if hm_refant is not None: local_global['hm_refant'] = hm_refant if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode # the invocation parameters for the non-subparameters can now be set - this picks up those defaults _invocation_parameters['hm_refant'] = self.__hm_refant( local_global ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global ) # the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default _invocation_parameters['vis'] = self.__vis( _invocation_parameters ) if vis is None else vis _invocation_parameters['field'] = self.__field( _invocation_parameters ) if field is None else field _invocation_parameters['intent'] = self.__intent( _invocation_parameters ) if intent is None else intent _invocation_parameters['spw'] = self.__spw( _invocation_parameters ) if spw is None else spw _invocation_parameters['refant'] = self.__refant( _invocation_parameters ) if refant is None else refant _invocation_parameters['refantignore'] = self.__refantignore( _invocation_parameters ) if refantignore is None else refantignore _invocation_parameters['geometry'] = self.__geometry( _invocation_parameters ) if geometry is None else geometry _invocation_parameters['flagging'] = self.__flagging( _invocation_parameters ) if flagging is None else flagging _invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun _invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults _invocation_parameters['parallel'] = self.__parallel( _invocation_parameters ) if parallel is None else parallel else: # invoke with inp/go semantics _invocation_parameters['vis'] = self.__vis( self.__globals_( ) ) _invocation_parameters['field'] = self.__field( self.__globals_( ) ) _invocation_parameters['intent'] = self.__intent( self.__globals_( ) ) _invocation_parameters['spw'] = self.__spw( self.__globals_( ) ) _invocation_parameters['hm_refant'] = self.__hm_refant( self.__globals_( ) ) _invocation_parameters['refant'] = self.__refant( self.__globals_( ) ) _invocation_parameters['refantignore'] = self.__refantignore( self.__globals_( ) ) _invocation_parameters['geometry'] = self.__geometry( self.__globals_( ) ) _invocation_parameters['flagging'] = self.__flagging( self.__globals_( ) ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) ) _invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) ) _invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) ) _invocation_parameters['parallel'] = self.__parallel( self.__globals_( ) ) try: with open(_prefile,'w') as _f: for _i in _invocation_parameters: _f.write("%-13s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i])))) _f.write("#hpc_hif_refant( ") count = 0 for _i in _invocation_parameters: _f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i])))) count += 1 if count < len(_invocation_parameters): _f.write(",") _f.write(" )\n") except: pass try: _return_result_ = _hpc_hif_refant_t( _invocation_parameters['vis'],_invocation_parameters['field'],_invocation_parameters['intent'],_invocation_parameters['spw'],_invocation_parameters['hm_refant'],_invocation_parameters['refant'],_invocation_parameters['refantignore'],_invocation_parameters['geometry'],_invocation_parameters['flagging'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'],_invocation_parameters['parallel'] ) except Exception as e: from traceback import format_exc from casatasks import casalog casalog.origin('hpc_hif_refant') casalog.post("Exception Reported: Error in hpc_hif_refant: %s" % str(e),'SEVERE') casalog.post(format_exc( )) _return_result_ = False try: os.rename(_prefile,_postfile) except: pass return _return_result_ hpc_hif_refant = _hpc_hif_refant( )