Source code for pipeline.hif.cli.gotasks.hif_lowgainflag

##################### generated by xml-casa (v2) from hif_lowgainflag.xml ###########
##################### 1250a2cca5c90bb575947d5a79626b17 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hif.cli import hif_lowgainflag as _hif_lowgainflag_t
from collections import OrderedDict
import numpy
import sys
import os

import shutil

[docs]def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate
class _hif_lowgainflag: """ hif_lowgainflag ---- Flag antennas with low or high gain Flag antennas with unusually low or high gain. Deviant antennas are detected by analysis of a view showing their calibration gains. This view is a list of 2D images with axes 'Time' and 'Antenna'; there is one image for each spectral window and intent. A flagcmd to flag all data for an antenna will be generated by any gain that is outside the range [fnm_lo_limit * median, fnm_hi_limit * median]. Output results -- If pipeline mode is 'getinputs' then None is returned. Otherwise the results object for the pipeline task is returned. --------- parameter descriptions --------------------------------------------- vis The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the h_init or hif_importdata task. '': use all MeasurementSets in the context Examples: 'ngc5921.ms', ['ngc5921a.ms', ngc5921b.ms', 'ngc5921c.ms'] intent A string containing the list of intents to be checked for antennas with deviant gains. The default is blank, which causes the task to select the 'BANDPASS' intent. spw The list of spectral windows and channels to which the calibration will be applied. Defaults to all science windows in the pipeline context. Examples: spw='17', spw='11, 15' refant A string containing a prioritized list of reference antenna name(s) to be used to produce the gain table. Defaults to the value(s) stored in the pipeline context. If undefined in the pipeline context defaults to the CASA reference antenna naming scheme. Examples: refant='DV01', refant='DV06,DV07' flag_nmedian True to flag figures of merit greater than fnm_hi_limit * median or lower than fnm_lo_limit * median. fnm_lo_limit Flag values lower than fnm_lo_limit * median fnm_hi_limit Flag values higher than fnm_hi_limit * median pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline determines the values of all context defined pipeline inputs automatically. In 'interactive' mode the user can set the pipeline context defined parameters manually. In 'getinputs' mode the user can check the settings of all pipeline parameters without running the task. dryrun Run the task (False) or just display the command (True) acceptresults Add the results of the task to the pipeline context (True) or reject them (False). --------- examples ----------------------------------------------------------- """ _info_group_ = """pipeline""" _info_desc_ = """Flag antennas with low or high gain""" __schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'intent': {'type': 'cStr', 'coerce': _coerce.to_str}, 'spw': {'type': 'cStr', 'coerce': _coerce.to_str}, 'refant': {'type': 'cStr', 'coerce': _coerce.to_str}, 'flag_nmedian': {'type': 'cBool'}, 'fnm_lo_limit': {'type': 'cFloat', 'coerce': _coerce.to_float}, 'fnm_hi_limit': {'type': 'cFloat', 'coerce': _coerce.to_float}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}} def __init__(self): self.__stdout = None self.__stderr = None self.__root_frame_ = None def __globals_(self): if self.__root_frame_ is None: self.__root_frame_ = _find_frame( ) assert self.__root_frame_ is not None, "could not find CASAshell global frame" return self.__root_frame_ def __to_string_(self,value): if type(value) is str: return "'%s'" % value else: return str(value) def __validate_(self,doc,schema): return _pc.validate(doc,schema) def __do_inp_output(self,param_prefix,description_str,formatting_chars): out = self.__stdout or sys.stdout description = description_str.split( ) prefix_width = 23 + 16 + 4 output = [ ] addon = '' first_addon = True while len(description) > 0: ## starting a new line..................................................................... if len(output) == 0: ## for first line add parameter information............................................ if len(param_prefix)-formatting_chars > prefix_width - 1: output.append(param_prefix) continue addon = param_prefix + ' #' first_addon = True addon_formatting = formatting_chars else: ## for subsequent lines space over prefix width........................................ addon = (' ' * prefix_width) + '#' first_addon = False addon_formatting = 0 ## if first word of description puts us over the screen width, bail........................ if len(addon + description[0]) - addon_formatting + 1 > self.term_width: ## if we're doing the first line make sure it's output................................. if first_addon: output.append(addon) break while len(description) > 0: ## if the next description word puts us over break for the next line................... if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break addon = addon + ' ' + description[0] description.pop(0) output.append(addon) out.write('\n'.join(output) + '\n') #--------- return nonsubparam values ---------------------------------------------- def __flag_nmedian_dflt( self, glb ): return True def __flag_nmedian( self, glb ): if 'flag_nmedian' in glb: return glb['flag_nmedian'] return True def __pipelinemode_dflt( self, glb ): return 'automatic' def __pipelinemode( self, glb ): if 'pipelinemode' in glb: return glb['pipelinemode'] return 'automatic' #--------- return inp/go default -------------------------------------------------- def __dryrun_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(False) return None def __intent_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __fnm_hi_limit_dflt( self, glb ): if self.__flag_nmedian( glb ) == bool(True): return float(1.5) return None def __refant_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __vis_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __acceptresults_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(True) return None def __fnm_lo_limit_dflt( self, glb ): if self.__flag_nmedian( glb ) == bool(True): return float(0.5) return None def __spw_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None #--------- return subparam values ------------------------------------------------- def __vis( self, glb ): if 'vis' in glb: return glb['vis'] dflt = self.__vis_dflt( glb ) if dflt is not None: return dflt return [ ] def __intent( self, glb ): if 'intent' in glb: return glb['intent'] dflt = self.__intent_dflt( glb ) if dflt is not None: return dflt return '' def __spw( self, glb ): if 'spw' in glb: return glb['spw'] dflt = self.__spw_dflt( glb ) if dflt is not None: return dflt return '' def __refant( self, glb ): if 'refant' in glb: return glb['refant'] dflt = self.__refant_dflt( glb ) if dflt is not None: return dflt return '' def __fnm_lo_limit( self, glb ): if 'fnm_lo_limit' in glb: return glb['fnm_lo_limit'] dflt = self.__fnm_lo_limit_dflt( glb ) if dflt is not None: return dflt return float(0.5) def __fnm_hi_limit( self, glb ): if 'fnm_hi_limit' in glb: return glb['fnm_hi_limit'] dflt = self.__fnm_hi_limit_dflt( glb ) if dflt is not None: return dflt return float(1.5) def __dryrun( self, glb ): if 'dryrun' in glb: return glb['dryrun'] dflt = self.__dryrun_dflt( glb ) if dflt is not None: return dflt return False def __acceptresults( self, glb ): if 'acceptresults' in glb: return glb['acceptresults'] dflt = self.__acceptresults_dflt( glb ) if dflt is not None: return dflt return True #--------- subparam inp output ---------------------------------------------------- def __vis_inp(self): if self.__vis_dflt( self.__globals_( ) ) is not None: description = 'List of input MeasurementSets' value = self.__vis( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __intent_inp(self): if self.__intent_dflt( self.__globals_( ) ) is not None: description = 'Data intent whose gains are to checked' value = self.__intent( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'intent': value},{'intent': self.__schema['intent']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('intent',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __spw_inp(self): if self.__spw_dflt( self.__globals_( ) ) is not None: description = 'Spectral window ids whose gains are to be checked' value = self.__spw( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'spw': value},{'spw': self.__schema['spw']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('spw',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __refant_inp(self): if self.__refant_dflt( self.__globals_( ) ) is not None: description = 'Reference antenna names' value = self.__refant( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'refant': value},{'refant': self.__schema['refant']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('refant',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __flag_nmedian_inp(self): description = 'True to flag values outside range [fnm_lo_limit * median, fnm_hi_limit*nmedian]' value = self.__flag_nmedian( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'flag_nmedian': value},{'flag_nmedian': self.__schema['flag_nmedian']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('flag_nmedian',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __fnm_lo_limit_inp(self): if self.__fnm_lo_limit_dflt( self.__globals_( ) ) is not None: description = 'Flag values lower than fnm_lo_limit * median' value = self.__fnm_lo_limit( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'fnm_lo_limit': value},{'fnm_lo_limit': self.__schema['fnm_lo_limit']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('fnm_lo_limit',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __fnm_hi_limit_inp(self): if self.__fnm_hi_limit_dflt( self.__globals_( ) ) is not None: description = 'Flag values higher than fnm_hi_limit * median' value = self.__fnm_hi_limit( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'fnm_hi_limit': value},{'fnm_hi_limit': self.__schema['fnm_hi_limit']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('fnm_hi_limit',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __pipelinemode_inp(self): description = 'The pipeline operations mode' value = self.__pipelinemode( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __dryrun_inp(self): if self.__dryrun_dflt( self.__globals_( ) ) is not None: description = 'Run the task (False) or just display the command (True)' value = self.__dryrun( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __acceptresults_inp(self): if self.__acceptresults_dflt( self.__globals_( ) ) is not None: description = 'Add the results to the pipeline context' value = self.__acceptresults( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) #--------- global default implementation------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def set_global_defaults(self): self.set_global_defaults.state['last'] = self glb = self.__globals_( ) if 'dryrun' in glb: del glb['dryrun'] if 'pipelinemode' in glb: del glb['pipelinemode'] if 'flag_nmedian' in glb: del glb['flag_nmedian'] if 'intent' in glb: del glb['intent'] if 'fnm_hi_limit' in glb: del glb['fnm_hi_limit'] if 'refant' in glb: del glb['refant'] if 'vis' in glb: del glb['vis'] if 'acceptresults' in glb: del glb['acceptresults'] if 'fnm_lo_limit' in glb: del glb['fnm_lo_limit'] if 'spw' in glb: del glb['spw'] #--------- inp function ----------------------------------------------------------- def inp(self): print("# hif_lowgainflag -- %s" % self._info_desc_) self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24)) self.__vis_inp( ) self.__intent_inp( ) self.__spw_inp( ) self.__refant_inp( ) self.__flag_nmedian_inp( ) self.__fnm_lo_limit_inp( ) self.__fnm_hi_limit_inp( ) self.__pipelinemode_inp( ) self.__dryrun_inp( ) self.__acceptresults_inp( ) #--------- tget function ---------------------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def tget(self,file=None): from casashell.private.stack_manip import find_frame from runpy import run_path filename = None if file is None: if os.path.isfile("hif_lowgainflag.last"): filename = "hif_lowgainflag.last" elif isinstance(file, str): if os.path.isfile(file): filename = file if filename is not None: glob = find_frame( ) newglob = run_path( filename, init_globals={ } ) for i in newglob: glob[i] = newglob[i] self.tget.state['last'] = self else: print("could not find last file, setting defaults instead...") self.set_global_defaults( ) def __call__( self, vis=None, intent=None, spw=None, refant=None, flag_nmedian=None, fnm_lo_limit=None, fnm_hi_limit=None, pipelinemode=None, dryrun=None, acceptresults=None ): def noobj(s): if s.startswith('<') and s.endswith('>'): return "None" else: return s _prefile = os.path.realpath('hif_lowgainflag.pre') _postfile = os.path.realpath('hif_lowgainflag.last') _return_result_ = None _arguments = [vis,intent,spw,refant,flag_nmedian,fnm_lo_limit,fnm_hi_limit,pipelinemode,dryrun,acceptresults] _invocation_parameters = OrderedDict( ) if any(map(lambda x: x is not None,_arguments)): # invoke python style # set the non sub-parameters that are not None local_global = { } if flag_nmedian is not None: local_global['flag_nmedian'] = flag_nmedian if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode # the invocation parameters for the non-subparameters can now be set - this picks up those defaults _invocation_parameters['flag_nmedian'] = self.__flag_nmedian( local_global ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global ) # the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default _invocation_parameters['vis'] = self.__vis( _invocation_parameters ) if vis is None else vis _invocation_parameters['intent'] = self.__intent( _invocation_parameters ) if intent is None else intent _invocation_parameters['spw'] = self.__spw( _invocation_parameters ) if spw is None else spw _invocation_parameters['refant'] = self.__refant( _invocation_parameters ) if refant is None else refant _invocation_parameters['fnm_lo_limit'] = self.__fnm_lo_limit( _invocation_parameters ) if fnm_lo_limit is None else fnm_lo_limit _invocation_parameters['fnm_hi_limit'] = self.__fnm_hi_limit( _invocation_parameters ) if fnm_hi_limit is None else fnm_hi_limit _invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun _invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults else: # invoke with inp/go semantics _invocation_parameters['vis'] = self.__vis( self.__globals_( ) ) _invocation_parameters['intent'] = self.__intent( self.__globals_( ) ) _invocation_parameters['spw'] = self.__spw( self.__globals_( ) ) _invocation_parameters['refant'] = self.__refant( self.__globals_( ) ) _invocation_parameters['flag_nmedian'] = self.__flag_nmedian( self.__globals_( ) ) _invocation_parameters['fnm_lo_limit'] = self.__fnm_lo_limit( self.__globals_( ) ) _invocation_parameters['fnm_hi_limit'] = self.__fnm_hi_limit( self.__globals_( ) ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) ) _invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) ) _invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) ) try: with open(_prefile,'w') as _f: for _i in _invocation_parameters: _f.write("%-13s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i])))) _f.write("#hif_lowgainflag( ") count = 0 for _i in _invocation_parameters: _f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i])))) count += 1 if count < len(_invocation_parameters): _f.write(",") _f.write(" )\n") except: pass try: _return_result_ = _hif_lowgainflag_t( _invocation_parameters['vis'],_invocation_parameters['intent'],_invocation_parameters['spw'],_invocation_parameters['refant'],_invocation_parameters['flag_nmedian'],_invocation_parameters['fnm_lo_limit'],_invocation_parameters['fnm_hi_limit'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] ) except Exception as e: from traceback import format_exc from casatasks import casalog casalog.origin('hif_lowgainflag') casalog.post("Exception Reported: Error in hif_lowgainflag: %s" % str(e),'SEVERE') casalog.post(format_exc( )) _return_result_ = False try: os.rename(_prefile,_postfile) except: pass return _return_result_ hif_lowgainflag = _hif_lowgainflag( )