Source code for pipeline.hif.cli.gotasks.uvcontfit

##################### generated by xml-casa (v2) from uvcontfit.xml #################
##################### 5a367cdd5124dad67c3b70eafd7d5465 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hif.cli import uvcontfit as _uvcontfit_t
from collections import OrderedDict
import numpy
import sys
import os

import shutil

[docs]def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate
class _uvcontfit: """ uvcontfit ---- Fit the continuum in the UV plane Fit the continuum in the UV plane using polynomials. --------- parameter descriptions --------------------------------------------- vis The name of the input visibility file caltable Name of output mueller matrix calibration table field Select field(s) using id(s) or name(s) intent Select intents spw Spectral window / channels for fitting the continuum combine Data axes to combine for the continuum estimation (none, spw and/or scan) solint Time scale for the continuum fit fitorder Polynomial order for the continuum fits append Append to a pre-existing table --------- examples ----------------------------------------------------------- This task estimates the continuum emission by fitting polynomials to the real and imaginary parts of the spectral windows and channels selected by spw and exclude spw. This fit represents a model of the continuum in all channels. Fit orders less than 2 are strongly recommended. """ _info_group_ = """modeling""" _info_desc_ = """Fit the continuum in the UV plane""" __schema = {'vis': {'type': 'cReqPath', 'coerce': _coerce.expand_path}, 'caltable': {'type': 'cStr', 'coerce': _coerce.to_str}, 'field': {'type': 'cStr', 'coerce': _coerce.to_str}, 'intent': {'type': 'cStr', 'coerce': _coerce.to_str}, 'spw': {'type': 'cStr', 'coerce': _coerce.to_str}, 'combine': {'type': 'cStr', 'coerce': _coerce.to_str}, 'solint': {'type': 'cVariant', 'coerce': [_coerce.to_variant]}, 'fitorder': {'type': 'cInt'}, 'append': {'type': 'cBool'}} def __init__(self): self.__stdout = None self.__stderr = None self.__root_frame_ = None def __globals_(self): if self.__root_frame_ is None: self.__root_frame_ = _find_frame( ) assert self.__root_frame_ is not None, "could not find CASAshell global frame" return self.__root_frame_ def __to_string_(self,value): if type(value) is str: return "'%s'" % value else: return str(value) def __validate_(self,doc,schema): return _pc.validate(doc,schema) def __do_inp_output(self,param_prefix,description_str,formatting_chars): out = self.__stdout or sys.stdout description = description_str.split( ) prefix_width = 23 + 8 + 4 output = [ ] addon = '' first_addon = True while len(description) > 0: ## starting a new line..................................................................... if len(output) == 0: ## for first line add parameter information............................................ if len(param_prefix)-formatting_chars > prefix_width - 1: output.append(param_prefix) continue addon = param_prefix + ' #' first_addon = True addon_formatting = formatting_chars else: ## for subsequent lines space over prefix width........................................ addon = (' ' * prefix_width) + '#' first_addon = False addon_formatting = 0 ## if first word of description puts us over the screen width, bail........................ if len(addon + description[0]) - addon_formatting + 1 > self.term_width: ## if we're doing the first line make sure it's output................................. if first_addon: output.append(addon) break while len(description) > 0: ## if the next description word puts us over break for the next line................... if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break addon = addon + ' ' + description[0] description.pop(0) output.append(addon) out.write('\n'.join(output) + '\n') #--------- return nonsubparam values ---------------------------------------------- def __combine_dflt( self, glb ): return '' def __combine( self, glb ): if 'combine' in glb: return glb['combine'] return '' def __vis_dflt( self, glb ): return '' def __vis( self, glb ): if 'vis' in glb: return glb['vis'] return '' def __caltable_dflt( self, glb ): return '' def __caltable( self, glb ): if 'caltable' in glb: return glb['caltable'] return '' def __spw_dflt( self, glb ): return '' def __spw( self, glb ): if 'spw' in glb: return glb['spw'] return '' def __fitorder_dflt( self, glb ): return int(0) def __fitorder( self, glb ): if 'fitorder' in glb: return glb['fitorder'] return int(0) def __field_dflt( self, glb ): return '' def __field( self, glb ): if 'field' in glb: return glb['field'] return '' def __append_dflt( self, glb ): return False def __append( self, glb ): if 'append' in glb: return glb['append'] return False def __solint_dflt( self, glb ): return 'int' def __solint( self, glb ): if 'solint' in glb: return glb['solint'] return 'int' def __intent_dflt( self, glb ): return '' def __intent( self, glb ): if 'intent' in glb: return glb['intent'] return '' #--------- return inp/go default -------------------------------------------------- #--------- return subparam values ------------------------------------------------- #--------- subparam inp output ---------------------------------------------------- def __vis_inp(self): description = 'The name of the input visibility file' value = self.__vis( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __caltable_inp(self): description = 'Name of output mueller matrix calibration table' value = self.__caltable( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'caltable': value},{'caltable': self.__schema['caltable']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('caltable',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __field_inp(self): description = 'Select field(s) using id(s) or name(s)' value = self.__field( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'field': value},{'field': self.__schema['field']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('field',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __intent_inp(self): description = 'Select intents' value = self.__intent( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'intent': value},{'intent': self.__schema['intent']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('intent',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __spw_inp(self): description = 'Spectral window / channels for fitting the continuum' value = self.__spw( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'spw': value},{'spw': self.__schema['spw']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('spw',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __combine_inp(self): description = 'Data axes to combine for the continuum estimation (none, spw and/or scan)' value = self.__combine( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'combine': value},{'combine': self.__schema['combine']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('combine',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __solint_inp(self): description = 'Time scale for the continuum fit' value = self.__solint( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'solint': value},{'solint': self.__schema['solint']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('solint',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __fitorder_inp(self): description = 'Polynomial order for the continuum fits' value = self.__fitorder( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'fitorder': value},{'fitorder': self.__schema['fitorder']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('fitorder',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __append_inp(self): description = 'Append to a pre-existing table' value = self.__append( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'append': value},{'append': self.__schema['append']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-8.8s = %s%-23s%s' % ('append',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) #--------- global default implementation------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def set_global_defaults(self): self.set_global_defaults.state['last'] = self glb = self.__globals_( ) if 'field' in glb: del glb['field'] if 'intent' in glb: del glb['intent'] if 'vis' in glb: del glb['vis'] if 'fitorder' in glb: del glb['fitorder'] if 'combine' in glb: del glb['combine'] if 'solint' in glb: del glb['solint'] if 'caltable' in glb: del glb['caltable'] if 'spw' in glb: del glb['spw'] if 'append' in glb: del glb['append'] #--------- inp function ----------------------------------------------------------- def inp(self): print("# uvcontfit -- %s" % self._info_desc_) self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24)) self.__vis_inp( ) self.__caltable_inp( ) self.__field_inp( ) self.__intent_inp( ) self.__spw_inp( ) self.__combine_inp( ) self.__solint_inp( ) self.__fitorder_inp( ) self.__append_inp( ) #--------- tget function ---------------------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def tget(self,file=None): from casashell.private.stack_manip import find_frame from runpy import run_path filename = None if file is None: if os.path.isfile("uvcontfit.last"): filename = "uvcontfit.last" elif isinstance(file, str): if os.path.isfile(file): filename = file if filename is not None: glob = find_frame( ) newglob = run_path( filename, init_globals={ } ) for i in newglob: glob[i] = newglob[i] self.tget.state['last'] = self else: print("could not find last file, setting defaults instead...") self.set_global_defaults( ) def __call__( self, vis=None, caltable=None, field=None, intent=None, spw=None, combine=None, solint=None, fitorder=None, append=None ): def noobj(s): if s.startswith('<') and s.endswith('>'): return "None" else: return s _prefile = os.path.realpath('uvcontfit.pre') _postfile = os.path.realpath('uvcontfit.last') _return_result_ = None _arguments = [vis,caltable,field,intent,spw,combine,solint,fitorder,append] _invocation_parameters = OrderedDict( ) if any(map(lambda x: x is not None,_arguments)): # invoke python style # set the non sub-parameters that are not None local_global = { } if vis is not None: local_global['vis'] = vis if caltable is not None: local_global['caltable'] = caltable if field is not None: local_global['field'] = field if intent is not None: local_global['intent'] = intent if spw is not None: local_global['spw'] = spw if combine is not None: local_global['combine'] = combine if solint is not None: local_global['solint'] = solint if fitorder is not None: local_global['fitorder'] = fitorder if append is not None: local_global['append'] = append # the invocation parameters for the non-subparameters can now be set - this picks up those defaults _invocation_parameters['vis'] = self.__vis( local_global ) _invocation_parameters['caltable'] = self.__caltable( local_global ) _invocation_parameters['field'] = self.__field( local_global ) _invocation_parameters['intent'] = self.__intent( local_global ) _invocation_parameters['spw'] = self.__spw( local_global ) _invocation_parameters['combine'] = self.__combine( local_global ) _invocation_parameters['solint'] = self.__solint( local_global ) _invocation_parameters['fitorder'] = self.__fitorder( local_global ) _invocation_parameters['append'] = self.__append( local_global ) # the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default else: # invoke with inp/go semantics _invocation_parameters['vis'] = self.__vis( self.__globals_( ) ) _invocation_parameters['caltable'] = self.__caltable( self.__globals_( ) ) _invocation_parameters['field'] = self.__field( self.__globals_( ) ) _invocation_parameters['intent'] = self.__intent( self.__globals_( ) ) _invocation_parameters['spw'] = self.__spw( self.__globals_( ) ) _invocation_parameters['combine'] = self.__combine( self.__globals_( ) ) _invocation_parameters['solint'] = self.__solint( self.__globals_( ) ) _invocation_parameters['fitorder'] = self.__fitorder( self.__globals_( ) ) _invocation_parameters['append'] = self.__append( self.__globals_( ) ) try: with open(_prefile,'w') as _f: for _i in _invocation_parameters: _f.write("%-8s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i])))) _f.write("#uvcontfit( ") count = 0 for _i in _invocation_parameters: _f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i])))) count += 1 if count < len(_invocation_parameters): _f.write(",") _f.write(" )\n") except: pass try: _return_result_ = _uvcontfit_t( _invocation_parameters['vis'],_invocation_parameters['caltable'],_invocation_parameters['field'],_invocation_parameters['intent'],_invocation_parameters['spw'],_invocation_parameters['combine'],_invocation_parameters['solint'],_invocation_parameters['fitorder'],_invocation_parameters['append'] ) except Exception as e: from traceback import format_exc from casatasks import casalog casalog.origin('uvcontfit') casalog.post("Exception Reported: Error in uvcontfit: %s" % str(e),'SEVERE') casalog.post(format_exc( )) _return_result_ = False try: os.rename(_prefile,_postfile) except: pass return _return_result_ uvcontfit = _uvcontfit( )