Source code for pipeline.h.cli.gotasks.hpc_h_applycal

##################### generated by xml-casa (v2) from hpc_h_applycal.xml ############
##################### 2b27e8f4d9f3b02aadb3cee0133f4859 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.h.cli import hpc_h_applycal as _hpc_h_applycal_t
from collections import OrderedDict
import numpy
import sys
import os

import shutil

[docs]def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate
class _hpc_h_applycal: """ hpc_h_applycal ---- Apply the calibration(s) to the data Apply precomputed calibrations to the data. hif_applycal applies the precomputed calibration tables stored in the pipeline context to the set of visibility files using predetermined field and spectral window maps and default values for the interpolation schemes. Users can interact with the pipeline calibration state using the tasks hif_export_calstate and hif_import_calstate. Issues There is some discussion about the appropriate values of calwt. Given properly scaled data, the correct value should be the CASA default of True. However at the current time ALMA is suggesting that calwt be set to True for applying observatory calibrations, e.g. antenna positions, WVR, and system temperature corrections, and to False for applying instrument calibrations, e.g. bandpass, gain, and flux. Output: results -- If pipeline mode is 'getinputs' then None is returned. Otherwise the results object for the pipeline task is returned --------- parameter descriptions --------------------------------------------- vis The list of input MeasurementSets. Defaults to the list of MeasurementSets in the pipeline context. Parameter is not available when pipelinemode='automatic'. example: ['X227.ms'] field A string containing the list of field names or field ids to which the calibration will be applied. Defaults to all fields in the pipeline context. Parameter is not available when pipelinemode='automatic'. example: '3C279', '3C279, M82' intent A string containing the list of intents against which the selected fields will be matched. Defaults to all supported intents in the pipeline context. Parameter is not available when pipelinemode='automatic'. example: '*TARGET*' spw The list of spectral windows and channels to which the calibration will be applied. Defaults to all science windows in the pipeline context. Parameter is not available when pipelinemode='automatic'. example: '17', '11, 15' antenna The list of antennas to which the calibration will be applied. Defaults to all antennas. Not currently supported. Parameter is not available when pipelinemode='automatic'. applymode Calibration apply mode ''='calflagstrict': calibrate data and apply flags from solutions using the strict flagging convention 'trial': report on flags from solutions, dataset entirely unchanged 'flagonly': apply flags from solutions only, data not calibrated 'calonly': calibrate data only, flags from solutions NOT applied 'calflagstrict': 'flagonlystrict':same as above except flag spws for which calibration is unavailable in one or more tables (instead of allowing them to pass uncalibrated and unflagged) flagbackup Backup the flags before the apply flagsum Compute before and after flagging summary statistics flagdetailedsum Compute detailed before and after flagging statistics summaries. Parameter available only when if flagsum is True. pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline determines the values of all context defined pipeline inputs automatically. In interactive mode the user can set the pipeline context defined parameters manually. In 'getinputs' mode the user can check the settings of all pipeline parameters without running the task. dryrun Run task (False) or display the command(True). Parameter is available only when pipelinemode='interactive'. acceptresults Add the results of the task to the pipeline context (True) or reject them (False). Parameter is available only when pipelinemode='interactive'. parallel Execute using CASA HPC functionality, if available. --------- examples ----------------------------------------------------------- 1. Apply the calibration to the target data h_session_applycal(intent='TARGET') """ _info_group_ = """pipeline""" _info_desc_ = """Apply the calibration(s) to the data""" __schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'field': {'type': 'cStr', 'coerce': _coerce.to_str}, 'intent': {'type': 'cStr', 'coerce': _coerce.to_str}, 'spw': {'type': 'cStr', 'coerce': _coerce.to_str}, 'antenna': {'type': 'cStr', 'coerce': _coerce.to_str}, 'applymode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'calonly', 'flagonlystrict', 'calflag', 'flagonly', 'trial', '', 'calflagstrict' ]}, 'flagbackup': {'type': 'cBool'}, 'flagsum': {'type': 'cBool'}, 'flagdetailedsum': {'type': 'cBool'}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}, 'parallel': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'true', 'false' ]}} def __init__(self): self.__stdout = None self.__stderr = None self.__root_frame_ = None def __globals_(self): if self.__root_frame_ is None: self.__root_frame_ = _find_frame( ) assert self.__root_frame_ is not None, "could not find CASAshell global frame" return self.__root_frame_ def __to_string_(self,value): if type(value) is str: return "'%s'" % value else: return str(value) def __validate_(self,doc,schema): return _pc.validate(doc,schema) def __do_inp_output(self,param_prefix,description_str,formatting_chars): out = self.__stdout or sys.stdout description = description_str.split( ) prefix_width = 23 + 18 + 4 output = [ ] addon = '' first_addon = True while len(description) > 0: ## starting a new line..................................................................... if len(output) == 0: ## for first line add parameter information............................................ if len(param_prefix)-formatting_chars > prefix_width - 1: output.append(param_prefix) continue addon = param_prefix + ' #' first_addon = True addon_formatting = formatting_chars else: ## for subsequent lines space over prefix width........................................ addon = (' ' * prefix_width) + '#' first_addon = False addon_formatting = 0 ## if first word of description puts us over the screen width, bail........................ if len(addon + description[0]) - addon_formatting + 1 > self.term_width: ## if we're doing the first line make sure it's output................................. if first_addon: output.append(addon) break while len(description) > 0: ## if the next description word puts us over break for the next line................... if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break addon = addon + ' ' + description[0] description.pop(0) output.append(addon) out.write('\n'.join(output) + '\n') #--------- return nonsubparam values ---------------------------------------------- def __applymode_dflt( self, glb ): return '' def __applymode( self, glb ): if 'applymode' in glb: return glb['applymode'] return '' def __flagsum_dflt( self, glb ): return True def __flagsum( self, glb ): if 'flagsum' in glb: return glb['flagsum'] return True def __pipelinemode_dflt( self, glb ): return '' def __pipelinemode( self, glb ): if 'pipelinemode' in glb: return glb['pipelinemode'] return '' #--------- return inp/go default -------------------------------------------------- def __antenna_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __dryrun_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(False) return None def __field_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __intent_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __vis_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __acceptresults_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(True) return None def __flagdetailedsum_dflt( self, glb ): if self.__flagsum( glb ) == bool(True): return bool(False) return None def __flagbackup_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(True) if self.__pipelinemode( glb ) == "getinputs": return bool(True) return None def __spw_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __parallel_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "automatic" return None #--------- return subparam values ------------------------------------------------- def __vis( self, glb ): if 'vis' in glb: return glb['vis'] dflt = self.__vis_dflt( glb ) if dflt is not None: return dflt return [ ] def __field( self, glb ): if 'field' in glb: return glb['field'] dflt = self.__field_dflt( glb ) if dflt is not None: return dflt return '' def __intent( self, glb ): if 'intent' in glb: return glb['intent'] dflt = self.__intent_dflt( glb ) if dflt is not None: return dflt return '' def __spw( self, glb ): if 'spw' in glb: return glb['spw'] dflt = self.__spw_dflt( glb ) if dflt is not None: return dflt return '' def __antenna( self, glb ): if 'antenna' in glb: return glb['antenna'] dflt = self.__antenna_dflt( glb ) if dflt is not None: return dflt return '' def __flagbackup( self, glb ): if 'flagbackup' in glb: return glb['flagbackup'] dflt = self.__flagbackup_dflt( glb ) if dflt is not None: return dflt return True def __flagdetailedsum( self, glb ): if 'flagdetailedsum' in glb: return glb['flagdetailedsum'] dflt = self.__flagdetailedsum_dflt( glb ) if dflt is not None: return dflt return False def __dryrun( self, glb ): if 'dryrun' in glb: return glb['dryrun'] dflt = self.__dryrun_dflt( glb ) if dflt is not None: return dflt return False def __acceptresults( self, glb ): if 'acceptresults' in glb: return glb['acceptresults'] dflt = self.__acceptresults_dflt( glb ) if dflt is not None: return dflt return True def __parallel( self, glb ): if 'parallel' in glb: return glb['parallel'] dflt = self.__parallel_dflt( glb ) if dflt is not None: return dflt return 'automatic' #--------- subparam inp output ---------------------------------------------------- def __vis_inp(self): if self.__vis_dflt( self.__globals_( ) ) is not None: description = 'List of input MeasurementSets' value = self.__vis( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __field_inp(self): if self.__field_dflt( self.__globals_( ) ) is not None: description = 'Set of data selection field names or ids' value = self.__field( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'field': value},{'field': self.__schema['field']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('field',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __intent_inp(self): if self.__intent_dflt( self.__globals_( ) ) is not None: description = 'Set of data selection observing intents' value = self.__intent( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'intent': value},{'intent': self.__schema['intent']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('intent',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __spw_inp(self): if self.__spw_dflt( self.__globals_( ) ) is not None: description = 'Set of data selection spectral window/channels' value = self.__spw( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'spw': value},{'spw': self.__schema['spw']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('spw',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __antenna_inp(self): if self.__antenna_dflt( self.__globals_( ) ) is not None: description = 'Set of data selection antenna ids' value = self.__antenna( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'antenna': value},{'antenna': self.__schema['antenna']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('antenna',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __applymode_inp(self): description = 'Calibration mode: "calflagstrict","calflag","calflagstrict","trial","flagonly","flagonlystrict", or "calonly"' value = self.__applymode( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'applymode': value},{'applymode': self.__schema['applymode']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-18.18s = %s%-23s%s' % ('applymode',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __flagbackup_inp(self): if self.__flagbackup_dflt( self.__globals_( ) ) is not None: description = 'Backup the flags before the apply' value = self.__flagbackup( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'flagbackup': value},{'flagbackup': self.__schema['flagbackup']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('flagbackup',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __flagsum_inp(self): description = 'Compute before and after flagging summary statistics' value = self.__flagsum( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'flagsum': value},{'flagsum': self.__schema['flagsum']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-18.18s =\x1B[0m %s%-23s%s' % ('flagsum',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __flagdetailedsum_inp(self): if self.__flagdetailedsum_dflt( self.__globals_( ) ) is not None: description = 'Compute before and after flagging summary statistics' value = self.__flagdetailedsum( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'flagdetailedsum': value},{'flagdetailedsum': self.__schema['flagdetailedsum']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('flagdetailedsum',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __pipelinemode_inp(self): description = 'The pipeline operating mode' value = self.__pipelinemode( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-18.18s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __dryrun_inp(self): if self.__dryrun_dflt( self.__globals_( ) ) is not None: description = 'Run task (False) or display the command(True)' value = self.__dryrun( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __acceptresults_inp(self): if self.__acceptresults_dflt( self.__globals_( ) ) is not None: description = 'Automatically accept results into the context' value = self.__acceptresults( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __parallel_inp(self): if self.__parallel_dflt( self.__globals_( ) ) is not None: description = 'Execute using CASA HPC functionality, if available.' value = self.__parallel( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'parallel': value},{'parallel': self.__schema['parallel']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-15.15s =\x1B[0m %s%-23s%s' % ('parallel',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) #--------- global default implementation------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def set_global_defaults(self): self.set_global_defaults.state['last'] = self glb = self.__globals_( ) if 'antenna' in glb: del glb['antenna'] if 'dryrun' in glb: del glb['dryrun'] if 'field' in glb: del glb['field'] if 'pipelinemode' in glb: del glb['pipelinemode'] if 'intent' in glb: del glb['intent'] if 'flagsum' in glb: del glb['flagsum'] if 'vis' in glb: del glb['vis'] if 'acceptresults' in glb: del glb['acceptresults'] if 'applymode' in glb: del glb['applymode'] if 'flagdetailedsum' in glb: del glb['flagdetailedsum'] if 'flagbackup' in glb: del glb['flagbackup'] if 'spw' in glb: del glb['spw'] if 'parallel' in glb: del glb['parallel'] #--------- inp function ----------------------------------------------------------- def inp(self): print("# hpc_h_applycal -- %s" % self._info_desc_) self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24)) self.__vis_inp( ) self.__field_inp( ) self.__intent_inp( ) self.__spw_inp( ) self.__antenna_inp( ) self.__applymode_inp( ) self.__flagbackup_inp( ) self.__flagsum_inp( ) self.__flagdetailedsum_inp( ) self.__pipelinemode_inp( ) self.__dryrun_inp( ) self.__acceptresults_inp( ) self.__parallel_inp( ) #--------- tget function ---------------------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def tget(self,file=None): from casashell.private.stack_manip import find_frame from runpy import run_path filename = None if file is None: if os.path.isfile("hpc_h_applycal.last"): filename = "hpc_h_applycal.last" elif isinstance(file, str): if os.path.isfile(file): filename = file if filename is not None: glob = find_frame( ) newglob = run_path( filename, init_globals={ } ) for i in newglob: glob[i] = newglob[i] self.tget.state['last'] = self else: print("could not find last file, setting defaults instead...") self.set_global_defaults( ) def __call__( self, vis=None, field=None, intent=None, spw=None, antenna=None, applymode=None, flagbackup=None, flagsum=None, flagdetailedsum=None, pipelinemode=None, dryrun=None, acceptresults=None, parallel=None ): def noobj(s): if s.startswith('<') and s.endswith('>'): return "None" else: return s _prefile = os.path.realpath('hpc_h_applycal.pre') _postfile = os.path.realpath('hpc_h_applycal.last') _return_result_ = None _arguments = [vis,field,intent,spw,antenna,applymode,flagbackup,flagsum,flagdetailedsum,pipelinemode,dryrun,acceptresults,parallel] _invocation_parameters = OrderedDict( ) if any(map(lambda x: x is not None,_arguments)): # invoke python style # set the non sub-parameters that are not None local_global = { } if applymode is not None: local_global['applymode'] = applymode if flagsum is not None: local_global['flagsum'] = flagsum if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode # the invocation parameters for the non-subparameters can now be set - this picks up those defaults _invocation_parameters['applymode'] = self.__applymode( local_global ) _invocation_parameters['flagsum'] = self.__flagsum( local_global ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global ) # the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default _invocation_parameters['vis'] = self.__vis( _invocation_parameters ) if vis is None else vis _invocation_parameters['field'] = self.__field( _invocation_parameters ) if field is None else field _invocation_parameters['intent'] = self.__intent( _invocation_parameters ) if intent is None else intent _invocation_parameters['spw'] = self.__spw( _invocation_parameters ) if spw is None else spw _invocation_parameters['antenna'] = self.__antenna( _invocation_parameters ) if antenna is None else antenna _invocation_parameters['flagbackup'] = self.__flagbackup( _invocation_parameters ) if flagbackup is None else flagbackup _invocation_parameters['flagdetailedsum'] = self.__flagdetailedsum( _invocation_parameters ) if flagdetailedsum is None else flagdetailedsum _invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun _invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults _invocation_parameters['parallel'] = self.__parallel( _invocation_parameters ) if parallel is None else parallel else: # invoke with inp/go semantics _invocation_parameters['vis'] = self.__vis( self.__globals_( ) ) _invocation_parameters['field'] = self.__field( self.__globals_( ) ) _invocation_parameters['intent'] = self.__intent( self.__globals_( ) ) _invocation_parameters['spw'] = self.__spw( self.__globals_( ) ) _invocation_parameters['antenna'] = self.__antenna( self.__globals_( ) ) _invocation_parameters['applymode'] = self.__applymode( self.__globals_( ) ) _invocation_parameters['flagbackup'] = self.__flagbackup( self.__globals_( ) ) _invocation_parameters['flagsum'] = self.__flagsum( self.__globals_( ) ) _invocation_parameters['flagdetailedsum'] = self.__flagdetailedsum( self.__globals_( ) ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) ) _invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) ) _invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) ) _invocation_parameters['parallel'] = self.__parallel( self.__globals_( ) ) try: with open(_prefile,'w') as _f: for _i in _invocation_parameters: _f.write("%-15s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i])))) _f.write("#hpc_h_applycal( ") count = 0 for _i in _invocation_parameters: _f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i])))) count += 1 if count < len(_invocation_parameters): _f.write(",") _f.write(" )\n") except: pass try: _return_result_ = _hpc_h_applycal_t( _invocation_parameters['vis'],_invocation_parameters['field'],_invocation_parameters['intent'],_invocation_parameters['spw'],_invocation_parameters['antenna'],_invocation_parameters['applymode'],_invocation_parameters['flagbackup'],_invocation_parameters['flagsum'],_invocation_parameters['flagdetailedsum'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'],_invocation_parameters['parallel'] ) except Exception as e: from traceback import format_exc from casatasks import casalog casalog.origin('hpc_h_applycal') casalog.post("Exception Reported: Error in hpc_h_applycal: %s" % str(e),'SEVERE') casalog.post(format_exc( )) _return_result_ = False try: os.rename(_prefile,_postfile) except: pass return _return_result_ hpc_h_applycal = _hpc_h_applycal( )