Source code for pipeline.hifa.cli.gotasks.hifa_gaincalsnr

##################### generated by xml-casa (v2) from hifa_gaincalsnr.xml ###########
##################### 39c1712a4c0c9bae4073a33f1b5d0830 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hifa.cli import hifa_gaincalsnr as _hifa_gaincalsnr_t
from collections import OrderedDict
import numpy
import sys
import os

import shutil

[docs]def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate
class _hifa_gaincalsnr: """ hifa_gaincalsnr ---- Compute gaincal signal to noise ratios per spw The gaincal solution signal to noise is determined as follows: o For each data set the list of source(s) to use for the per scan gaincal solution signal to noise estimation is compiled based on the values of the field, intent, and spw parameters. o Source fluxes are determined for each spw and source combination. o Fluxes in Jy are derived from the pipeline context. o Pipeline context fluxes are derived from the online flux calibrator catalog, the ASDM, or the user via the flux.csv file. o If no fluxes are available the task terminates. o Atmospheric calibration and observations scans are determined for each spw and source combination. o If intent is set to 'PHASE' are there are no atmospheric scans associated with the 'PHASE' calibrator, 'TARGET' atmospheric scans will be used instead. o If atmospheric scans cannot be associated with any of the spw and source combinations the task terminates. o Science spws are mapped to atmospheric spws for each science spw and source combinations. o If mappings cannot be determined for any of the spws the task terminates. o The median Tsys value for each atmospheric spw and source combination is determined from the SYSCAL table. Medians are computed first by channel, then by antenna, in order to reduce sensitivity to deviant values. o The science spw parameters, exposure time(s), and integration time(s) are determined. o The per scan sensitivity and signal to noise estimates are computed per science spectral window. Nominal Tsys and sensitivity values per receiver band provide by the ALMA project are used for this estimate. o The QA score is based on how many signal to noise estimates greater than the requested signal to noise ration can be computed. Output results -- If pipeline mode is 'getinputs' then None is returned. Otherwise the results object for the pipeline task is returned. --------- parameter descriptions --------------------------------------------- vis The list of input MeasurementSets. Defaults to the list of MeasurementSets specified in the pipeline context. example: vis=['M82A.ms', 'M82B.ms'] field The list of field names of sources to be used for signal to noise estimation. Defaults to all fields with the standard intent. example: field='3C279' intent A string containing a comma delimited list of intents against which the selected fields are matched. Defaults to 'PHASE'. example: intent='BANDPASS' spw The list of spectral windows and channels for which gain solutions are computed. Defaults to all the science spectral windows for which there are both 'intent' and TARGET intents. example: spw='13,15' phasesnr The required gaincal solution signal to noise. example: phasesnr=20.0 bwedgefrac The fraction of the bandwidth edges that is flagged. example: bwedgefrac=0.0 hm_nantennas The heuristics for determines the number of antennas to use in the signal to noise estimate. The options are 'all' and 'unflagged'. The 'unflagged' options is not currently supported. example: hm_nantennas='unflagged' maxfracflagged The maximum fraction of an antenna that can be flagged before it is excluded from the signal to noise estimate. example: maxfracflagged=0.80 pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline determines the values of all context defined pipeline inputs automatically. In interactive mode the user can set the pipeline context defined parameters manually. In 'getinputs' mode the user can check the settings of all pipeline parameters without running the task. dryrun Run the commands (True) or generate the commands to be run but do not execute (False). acceptresults ults of the task to the pipeline context (True) or reject them (False). --------- examples ----------------------------------------------------------- 1. Estimate the per scan gaincal solution sensitivities and signal to noise ratios for all the science spectral windows: hifa_gaincalsnr() """ _info_group_ = """pipeline""" _info_desc_ = """Compute gaincal signal to noise ratios per spw""" __schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'field': {'type': 'cStr', 'coerce': _coerce.to_str}, 'intent': {'type': 'cStr', 'coerce': _coerce.to_str}, 'spw': {'type': 'cStr', 'coerce': _coerce.to_str}, 'phasesnr': {'type': 'cFloat', 'coerce': _coerce.to_float}, 'bwedgefrac': {'type': 'cFloat', 'coerce': _coerce.to_float}, 'hm_nantennas': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'all', 'unflagged' ]}, 'maxfracflagged': {'type': 'cFloat', 'coerce': _coerce.to_float}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}} def __init__(self): self.__stdout = None self.__stderr = None self.__root_frame_ = None def __globals_(self): if self.__root_frame_ is None: self.__root_frame_ = _find_frame( ) assert self.__root_frame_ is not None, "could not find CASAshell global frame" return self.__root_frame_ def __to_string_(self,value): if type(value) is str: return "'%s'" % value else: return str(value) def __validate_(self,doc,schema): return _pc.validate(doc,schema) def __do_inp_output(self,param_prefix,description_str,formatting_chars): out = self.__stdout or sys.stdout description = description_str.split( ) prefix_width = 23 + 17 + 4 output = [ ] addon = '' first_addon = True while len(description) > 0: ## starting a new line..................................................................... if len(output) == 0: ## for first line add parameter information............................................ if len(param_prefix)-formatting_chars > prefix_width - 1: output.append(param_prefix) continue addon = param_prefix + ' #' first_addon = True addon_formatting = formatting_chars else: ## for subsequent lines space over prefix width........................................ addon = (' ' * prefix_width) + '#' first_addon = False addon_formatting = 0 ## if first word of description puts us over the screen width, bail........................ if len(addon + description[0]) - addon_formatting + 1 > self.term_width: ## if we're doing the first line make sure it's output................................. if first_addon: output.append(addon) break while len(description) > 0: ## if the next description word puts us over break for the next line................... if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break addon = addon + ' ' + description[0] description.pop(0) output.append(addon) out.write('\n'.join(output) + '\n') #--------- return nonsubparam values ---------------------------------------------- def __bwedgefrac_dflt( self, glb ): return float(0.03125) def __bwedgefrac( self, glb ): if 'bwedgefrac' in glb: return glb['bwedgefrac'] return float(0.03125) def __hm_nantennas_dflt( self, glb ): return 'all' def __hm_nantennas( self, glb ): if 'hm_nantennas' in glb: return glb['hm_nantennas'] return 'all' def __pipelinemode_dflt( self, glb ): return 'automatic' def __pipelinemode( self, glb ): if 'pipelinemode' in glb: return glb['pipelinemode'] return 'automatic' def __phasesnr_dflt( self, glb ): return float(25.0) def __phasesnr( self, glb ): if 'phasesnr' in glb: return glb['phasesnr'] return float(25.0) #--------- return inp/go default -------------------------------------------------- def __maxfracflagged_dflt( self, glb ): if self.__hm_nantennas( glb ) == "unflagged": return float(0.90) return None def __dryrun_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(False) return None def __field_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __intent_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "PHASE" if self.__pipelinemode( glb ) == "getinputs": return "PHASE" return None def __vis_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __acceptresults_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(True) return None def __spw_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None #--------- return subparam values ------------------------------------------------- def __vis( self, glb ): if 'vis' in glb: return glb['vis'] dflt = self.__vis_dflt( glb ) if dflt is not None: return dflt return [ ] def __field( self, glb ): if 'field' in glb: return glb['field'] dflt = self.__field_dflt( glb ) if dflt is not None: return dflt return '' def __intent( self, glb ): if 'intent' in glb: return glb['intent'] dflt = self.__intent_dflt( glb ) if dflt is not None: return dflt return 'PHASE' def __spw( self, glb ): if 'spw' in glb: return glb['spw'] dflt = self.__spw_dflt( glb ) if dflt is not None: return dflt return '' def __maxfracflagged( self, glb ): if 'maxfracflagged' in glb: return glb['maxfracflagged'] dflt = self.__maxfracflagged_dflt( glb ) if dflt is not None: return dflt return float(0.90) def __dryrun( self, glb ): if 'dryrun' in glb: return glb['dryrun'] dflt = self.__dryrun_dflt( glb ) if dflt is not None: return dflt return False def __acceptresults( self, glb ): if 'acceptresults' in glb: return glb['acceptresults'] dflt = self.__acceptresults_dflt( glb ) if dflt is not None: return dflt return True #--------- subparam inp output ---------------------------------------------------- def __vis_inp(self): if self.__vis_dflt( self.__globals_( ) ) is not None: description = 'List of input MeasurementSets' value = self.__vis( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-14.14s =\x1B[0m %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __field_inp(self): if self.__field_dflt( self.__globals_( ) ) is not None: description = 'Set of data selection field names' value = self.__field( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'field': value},{'field': self.__schema['field']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-14.14s =\x1B[0m %s%-23s%s' % ('field',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __intent_inp(self): if self.__intent_dflt( self.__globals_( ) ) is not None: description = 'Set of data selection observing intents' value = self.__intent( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'intent': value},{'intent': self.__schema['intent']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-14.14s =\x1B[0m %s%-23s%s' % ('intent',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __spw_inp(self): if self.__spw_dflt( self.__globals_( ) ) is not None: description = 'Set of data selection spectral window ids' value = self.__spw( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'spw': value},{'spw': self.__schema['spw']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-14.14s =\x1B[0m %s%-23s%s' % ('spw',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __phasesnr_inp(self): description = 'The signal to noise minimum' value = self.__phasesnr( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'phasesnr': value},{'phasesnr': self.__schema['phasesnr']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-17.17s = %s%-23s%s' % ('phasesnr',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __bwedgefrac_inp(self): description = 'The fraction of the bandwidth edge that is flagged' value = self.__bwedgefrac( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'bwedgefrac': value},{'bwedgefrac': self.__schema['bwedgefrac']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-17.17s = %s%-23s%s' % ('bwedgefrac',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __hm_nantennas_inp(self): description = 'The antenna selection heuristic (unsupported)' value = self.__hm_nantennas( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'hm_nantennas': value},{'hm_nantennas': self.__schema['hm_nantennas']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-17.17s =\x1B[0m %s%-23s%s' % ('hm_nantennas',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __maxfracflagged_inp(self): if self.__maxfracflagged_dflt( self.__globals_( ) ) is not None: description = 'The maximum fraction of data flagged per antenna (unsupported)' value = self.__maxfracflagged( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'maxfracflagged': value},{'maxfracflagged': self.__schema['maxfracflagged']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-14.14s =\x1B[0m %s%-23s%s' % ('maxfracflagged',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __pipelinemode_inp(self): description = 'The pipeline operating mode' value = self.__pipelinemode( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-17.17s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __dryrun_inp(self): if self.__dryrun_dflt( self.__globals_( ) ) is not None: description = 'Run task (False) or display the command(True)' value = self.__dryrun( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-14.14s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __acceptresults_inp(self): if self.__acceptresults_dflt( self.__globals_( ) ) is not None: description = 'Automatically accept results into the context' value = self.__acceptresults( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-14.14s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) #--------- global default implementation------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def set_global_defaults(self): self.set_global_defaults.state['last'] = self glb = self.__globals_( ) if 'maxfracflagged' in glb: del glb['maxfracflagged'] if 'dryrun' in glb: del glb['dryrun'] if 'bwedgefrac' in glb: del glb['bwedgefrac'] if 'field' in glb: del glb['field'] if 'pipelinemode' in glb: del glb['pipelinemode'] if 'intent' in glb: del glb['intent'] if 'phasesnr' in glb: del glb['phasesnr'] if 'vis' in glb: del glb['vis'] if 'acceptresults' in glb: del glb['acceptresults'] if 'hm_nantennas' in glb: del glb['hm_nantennas'] if 'spw' in glb: del glb['spw'] #--------- inp function ----------------------------------------------------------- def inp(self): print("# hifa_gaincalsnr -- %s" % self._info_desc_) self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24)) self.__vis_inp( ) self.__field_inp( ) self.__intent_inp( ) self.__spw_inp( ) self.__phasesnr_inp( ) self.__bwedgefrac_inp( ) self.__hm_nantennas_inp( ) self.__maxfracflagged_inp( ) self.__pipelinemode_inp( ) self.__dryrun_inp( ) self.__acceptresults_inp( ) #--------- tget function ---------------------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def tget(self,file=None): from casashell.private.stack_manip import find_frame from runpy import run_path filename = None if file is None: if os.path.isfile("hifa_gaincalsnr.last"): filename = "hifa_gaincalsnr.last" elif isinstance(file, str): if os.path.isfile(file): filename = file if filename is not None: glob = find_frame( ) newglob = run_path( filename, init_globals={ } ) for i in newglob: glob[i] = newglob[i] self.tget.state['last'] = self else: print("could not find last file, setting defaults instead...") self.set_global_defaults( ) def __call__( self, vis=None, field=None, intent=None, spw=None, phasesnr=None, bwedgefrac=None, hm_nantennas=None, maxfracflagged=None, pipelinemode=None, dryrun=None, acceptresults=None ): def noobj(s): if s.startswith('<') and s.endswith('>'): return "None" else: return s _prefile = os.path.realpath('hifa_gaincalsnr.pre') _postfile = os.path.realpath('hifa_gaincalsnr.last') _return_result_ = None _arguments = [vis,field,intent,spw,phasesnr,bwedgefrac,hm_nantennas,maxfracflagged,pipelinemode,dryrun,acceptresults] _invocation_parameters = OrderedDict( ) if any(map(lambda x: x is not None,_arguments)): # invoke python style # set the non sub-parameters that are not None local_global = { } if phasesnr is not None: local_global['phasesnr'] = phasesnr if bwedgefrac is not None: local_global['bwedgefrac'] = bwedgefrac if hm_nantennas is not None: local_global['hm_nantennas'] = hm_nantennas if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode # the invocation parameters for the non-subparameters can now be set - this picks up those defaults _invocation_parameters['phasesnr'] = self.__phasesnr( local_global ) _invocation_parameters['bwedgefrac'] = self.__bwedgefrac( local_global ) _invocation_parameters['hm_nantennas'] = self.__hm_nantennas( local_global ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global ) # the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default _invocation_parameters['vis'] = self.__vis( _invocation_parameters ) if vis is None else vis _invocation_parameters['field'] = self.__field( _invocation_parameters ) if field is None else field _invocation_parameters['intent'] = self.__intent( _invocation_parameters ) if intent is None else intent _invocation_parameters['spw'] = self.__spw( _invocation_parameters ) if spw is None else spw _invocation_parameters['maxfracflagged'] = self.__maxfracflagged( _invocation_parameters ) if maxfracflagged is None else maxfracflagged _invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun _invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults else: # invoke with inp/go semantics _invocation_parameters['vis'] = self.__vis( self.__globals_( ) ) _invocation_parameters['field'] = self.__field( self.__globals_( ) ) _invocation_parameters['intent'] = self.__intent( self.__globals_( ) ) _invocation_parameters['spw'] = self.__spw( self.__globals_( ) ) _invocation_parameters['phasesnr'] = self.__phasesnr( self.__globals_( ) ) _invocation_parameters['bwedgefrac'] = self.__bwedgefrac( self.__globals_( ) ) _invocation_parameters['hm_nantennas'] = self.__hm_nantennas( self.__globals_( ) ) _invocation_parameters['maxfracflagged'] = self.__maxfracflagged( self.__globals_( ) ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) ) _invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) ) _invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) ) try: with open(_prefile,'w') as _f: for _i in _invocation_parameters: _f.write("%-14s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i])))) _f.write("#hifa_gaincalsnr( ") count = 0 for _i in _invocation_parameters: _f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i])))) count += 1 if count < len(_invocation_parameters): _f.write(",") _f.write(" )\n") except: pass try: _return_result_ = _hifa_gaincalsnr_t( _invocation_parameters['vis'],_invocation_parameters['field'],_invocation_parameters['intent'],_invocation_parameters['spw'],_invocation_parameters['phasesnr'],_invocation_parameters['bwedgefrac'],_invocation_parameters['hm_nantennas'],_invocation_parameters['maxfracflagged'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] ) except Exception as e: from traceback import format_exc from casatasks import casalog casalog.origin('hifa_gaincalsnr') casalog.post("Exception Reported: Error in hifa_gaincalsnr: %s" % str(e),'SEVERE') casalog.post(format_exc( )) _return_result_ = False try: os.rename(_prefile,_postfile) except: pass return _return_result_ hifa_gaincalsnr = _hifa_gaincalsnr( )