##################### generated by xml-casa (v2) from uvcontfit.xml #################
##################### 5a367cdd5124dad67c3b70eafd7d5465 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hif.cli import uvcontfit as _uvcontfit_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
class _uvcontfit:
"""
uvcontfit ---- Fit the continuum in the UV plane
Fit the continuum in the UV plane using polynomials.
--------- parameter descriptions ---------------------------------------------
vis The name of the input visibility file
caltable Name of output mueller matrix calibration table
field Select field(s) using id(s) or name(s)
intent Select intents
spw Spectral window / channels for fitting the continuum
combine Data axes to combine for the continuum estimation (none, spw and/or scan)
solint Time scale for the continuum fit
fitorder Polynomial order for the continuum fits
append Append to a pre-existing table
--------- examples -----------------------------------------------------------
This task estimates the continuum emission by fitting polynomials to
the real and imaginary parts of the spectral windows and channels
selected by spw and exclude spw. This fit represents a model of
the continuum in all channels. Fit orders less than 2 are strongly
recommended.
"""
_info_group_ = """modeling"""
_info_desc_ = """Fit the continuum in the UV plane"""
__schema = {'vis': {'type': 'cReqPath', 'coerce': _coerce.expand_path}, 'caltable': {'type': 'cStr', 'coerce': _coerce.to_str}, 'field': {'type': 'cStr', 'coerce': _coerce.to_str}, 'intent': {'type': 'cStr', 'coerce': _coerce.to_str}, 'spw': {'type': 'cStr', 'coerce': _coerce.to_str}, 'combine': {'type': 'cStr', 'coerce': _coerce.to_str}, 'solint': {'type': 'cVariant', 'coerce': [_coerce.to_variant]}, 'fitorder': {'type': 'cInt'}, 'append': {'type': 'cBool'}}
def __init__(self):
self.__stdout = None
self.__stderr = None
self.__root_frame_ = None
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 8 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
def __combine_dflt( self, glb ):
return ''
def __combine( self, glb ):
if 'combine' in glb: return glb['combine']
return ''
def __vis_dflt( self, glb ):
return ''
def __vis( self, glb ):
if 'vis' in glb: return glb['vis']
return ''
def __caltable_dflt( self, glb ):
return ''
def __caltable( self, glb ):
if 'caltable' in glb: return glb['caltable']
return ''
def __spw_dflt( self, glb ):
return ''
def __spw( self, glb ):
if 'spw' in glb: return glb['spw']
return ''
def __fitorder_dflt( self, glb ):
return int(0)
def __fitorder( self, glb ):
if 'fitorder' in glb: return glb['fitorder']
return int(0)
def __field_dflt( self, glb ):
return ''
def __field( self, glb ):
if 'field' in glb: return glb['field']
return ''
def __append_dflt( self, glb ):
return False
def __append( self, glb ):
if 'append' in glb: return glb['append']
return False
def __solint_dflt( self, glb ):
return 'int'
def __solint( self, glb ):
if 'solint' in glb: return glb['solint']
return 'int'
def __intent_dflt( self, glb ):
return ''
def __intent( self, glb ):
if 'intent' in glb: return glb['intent']
return ''
#--------- return inp/go default --------------------------------------------------
#--------- return subparam values -------------------------------------------------
#--------- subparam inp output ----------------------------------------------------
def __vis_inp(self):
description = 'The name of the input visibility file'
value = self.__vis( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __caltable_inp(self):
description = 'Name of output mueller matrix calibration table'
value = self.__caltable( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'caltable': value},{'caltable': self.__schema['caltable']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('caltable',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __field_inp(self):
description = 'Select field(s) using id(s) or name(s)'
value = self.__field( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'field': value},{'field': self.__schema['field']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('field',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __intent_inp(self):
description = 'Select intents'
value = self.__intent( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'intent': value},{'intent': self.__schema['intent']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('intent',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __spw_inp(self):
description = 'Spectral window / channels for fitting the continuum'
value = self.__spw( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'spw': value},{'spw': self.__schema['spw']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('spw',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __combine_inp(self):
description = 'Data axes to combine for the continuum estimation (none, spw and/or scan)'
value = self.__combine( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'combine': value},{'combine': self.__schema['combine']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('combine',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __solint_inp(self):
description = 'Time scale for the continuum fit'
value = self.__solint( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'solint': value},{'solint': self.__schema['solint']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('solint',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __fitorder_inp(self):
description = 'Polynomial order for the continuum fits'
value = self.__fitorder( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'fitorder': value},{'fitorder': self.__schema['fitorder']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('fitorder',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __append_inp(self):
description = 'Append to a pre-existing table'
value = self.__append( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'append': value},{'append': self.__schema['append']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('append',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'field' in glb: del glb['field']
if 'intent' in glb: del glb['intent']
if 'vis' in glb: del glb['vis']
if 'fitorder' in glb: del glb['fitorder']
if 'combine' in glb: del glb['combine']
if 'solint' in glb: del glb['solint']
if 'caltable' in glb: del glb['caltable']
if 'spw' in glb: del glb['spw']
if 'append' in glb: del glb['append']
#--------- inp function -----------------------------------------------------------
def inp(self):
print("# uvcontfit -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__vis_inp( )
self.__caltable_inp( )
self.__field_inp( )
self.__intent_inp( )
self.__spw_inp( )
self.__combine_inp( )
self.__solint_inp( )
self.__fitorder_inp( )
self.__append_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("uvcontfit.last"):
filename = "uvcontfit.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
def __call__( self, vis=None, caltable=None, field=None, intent=None, spw=None, combine=None, solint=None, fitorder=None, append=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('uvcontfit.pre')
_postfile = os.path.realpath('uvcontfit.last')
_return_result_ = None
_arguments = [vis,caltable,field,intent,spw,combine,solint,fitorder,append]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if vis is not None: local_global['vis'] = vis
if caltable is not None: local_global['caltable'] = caltable
if field is not None: local_global['field'] = field
if intent is not None: local_global['intent'] = intent
if spw is not None: local_global['spw'] = spw
if combine is not None: local_global['combine'] = combine
if solint is not None: local_global['solint'] = solint
if fitorder is not None: local_global['fitorder'] = fitorder
if append is not None: local_global['append'] = append
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['vis'] = self.__vis( local_global )
_invocation_parameters['caltable'] = self.__caltable( local_global )
_invocation_parameters['field'] = self.__field( local_global )
_invocation_parameters['intent'] = self.__intent( local_global )
_invocation_parameters['spw'] = self.__spw( local_global )
_invocation_parameters['combine'] = self.__combine( local_global )
_invocation_parameters['solint'] = self.__solint( local_global )
_invocation_parameters['fitorder'] = self.__fitorder( local_global )
_invocation_parameters['append'] = self.__append( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
else:
# invoke with inp/go semantics
_invocation_parameters['vis'] = self.__vis( self.__globals_( ) )
_invocation_parameters['caltable'] = self.__caltable( self.__globals_( ) )
_invocation_parameters['field'] = self.__field( self.__globals_( ) )
_invocation_parameters['intent'] = self.__intent( self.__globals_( ) )
_invocation_parameters['spw'] = self.__spw( self.__globals_( ) )
_invocation_parameters['combine'] = self.__combine( self.__globals_( ) )
_invocation_parameters['solint'] = self.__solint( self.__globals_( ) )
_invocation_parameters['fitorder'] = self.__fitorder( self.__globals_( ) )
_invocation_parameters['append'] = self.__append( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-8s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#uvcontfit( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _uvcontfit_t( _invocation_parameters['vis'],_invocation_parameters['caltable'],_invocation_parameters['field'],_invocation_parameters['intent'],_invocation_parameters['spw'],_invocation_parameters['combine'],_invocation_parameters['solint'],_invocation_parameters['fitorder'],_invocation_parameters['append'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('uvcontfit')
casalog.post("Exception Reported: Error in uvcontfit: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_
uvcontfit = _uvcontfit( )