##################### generated by xml-casa (v2) from hif_setmodels.xml #############
##################### 914ee000d03f4468b827af2887554753 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hif.cli import hif_setmodels as _hif_setmodels_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
class _hif_setmodels:
"""
hif_setmodels ---- Set calibrator source models
Set model fluxes values for calibrator reference and transfer sources using lookup
values. By default the reference sources are the flux calibrators and the transfer
sources are the bandpass, phase, and check source calibrators. Reference sources
which are also in the transfer source list are removed from the transfer source list.
Built-in lookup tables are used to compute models for solar system object calibrators.
Point source models for other calibrators are provided in the reference file.
Normalize fluxes are computed for transfer sources if the normfluxes parameter is
set to True.
The reference file default to a file called 'flux.csv' in the current working
directory. This file is normal created in the importdata step. The file is in
'csv' format and contains the following comma delimited columns.
vis,fieldid,spwid,I,Q,U,V,pix,comment
Output:
results -- If pipeline mode is 'getinputs' then None is returned. Otherwise
the results object for the pipeline task is returned
--------- parameter descriptions ---------------------------------------------
vis The list of input MeasurementSets. Defaults to the list of
MeasurementSets specified in the pipeline context.
example: ['M32A.ms', 'M32B.ms']
reference A string containing a comma delimited list of field names
defining the reference calibrators. Defaults to field names with
intent 'AMPLITUDE'.
example: 'M82,3C273'
refintent A string containing a comma delimited list of intents
used to select the reference calibrators. Defaults to 'AMPLITUDE'.
example: 'BANDPASS'
transfer A string containing a comma delimited list of field names
defining the transfer calibrators. Defaults to field names with
intent ''.
example: 'J1328+041,J1206+30'
transintent A string containing a comma delimited list of intents
defining the transfer calibrators. Defaults to 'BANDPASS,PHASE,CHECK'.
'' stands for no transfer sources.
example: 'PHASE'
reffile The reference file containing a lookup table of point source models
This file currently defaults to 'flux.csv' in the working directory. This
file must conform to the standard pipeline 'flux.csv' format
example: 'myfluxes.csv'
normfluxes
scalebychan lux density on a per channel basis or else on a per spw basis
example: False
pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline
determines the values of all context defined pipeline inputs automatically.
In interactive mode the user can set the pipeline context defined
parameters manually. In 'getinputs' mode the users can check the settings
of all pipeline parameters without running the task.
dryrun Run the commands (True) or generate the commands to be run but
do not execute (False).
acceptresults Add the results of the task to the pipeline context (True) or
reject them (False).
--------- examples -----------------------------------------------------------
1. Set model fluxes for the flux, bandpass, phase, and check sources.
hif_setmodels()
"""
_info_group_ = """pipeline"""
_info_desc_ = """Set calibrator source models"""
__schema = {'vis': {'type': 'cStr', 'coerce': _coerce.to_str}, 'reference': {'type': 'cVariant', 'coerce': [_coerce.to_variant]}, 'refintent': {'type': 'cStr', 'coerce': _coerce.to_str}, 'transfer': {'type': 'cVariant', 'coerce': [_coerce.to_variant]}, 'transintent': {'type': 'cStr', 'coerce': _coerce.to_str}, 'reffile': {'type': 'cStr', 'coerce': _coerce.to_str}, 'normfluxes': {'type': 'cBool'}, 'scalebychan': {'type': 'cBool'}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}}
def __init__(self):
self.__stdout = None
self.__stderr = None
self.__root_frame_ = None
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 16 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
def __scalebychan_dflt( self, glb ):
return True
def __scalebychan( self, glb ):
if 'scalebychan' in glb: return glb['scalebychan']
return True
def __normfluxes_dflt( self, glb ):
return True
def __normfluxes( self, glb ):
if 'normfluxes' in glb: return glb['normfluxes']
return True
def __pipelinemode_dflt( self, glb ):
return 'automatic'
def __pipelinemode( self, glb ):
if 'pipelinemode' in glb: return glb['pipelinemode']
return 'automatic'
def __reffile_dflt( self, glb ):
return ''
def __reffile( self, glb ):
if 'reffile' in glb: return glb['reffile']
return ''
#--------- return inp/go default --------------------------------------------------
def __reference_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return { }
if self.__pipelinemode( glb ) == "getinputs": return { }
return None
def __dryrun_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
return None
def __transintent_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return "BANDPASS,PHASE,CHECK"
if self.__pipelinemode( glb ) == "getinputs": return ""
return None
def __vis_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return ""
if self.__pipelinemode( glb ) == "getinputs": return ""
return None
def __acceptresults_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
return None
def __refintent_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return "AMPLITUDE"
if self.__pipelinemode( glb ) == "getinputs": return "AMPLITUDE"
return None
def __transfer_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return { }
if self.__pipelinemode( glb ) == "getinputs": return BANDPASS,PHASE,CHECK
return None
#--------- return subparam values -------------------------------------------------
def __vis( self, glb ):
if 'vis' in glb: return glb['vis']
dflt = self.__vis_dflt( glb )
if dflt is not None: return dflt
return ''
def __reference( self, glb ):
if 'reference' in glb: return glb['reference']
dflt = self.__reference_dflt( glb )
if dflt is not None: return dflt
return ''
def __refintent( self, glb ):
if 'refintent' in glb: return glb['refintent']
dflt = self.__refintent_dflt( glb )
if dflt is not None: return dflt
return 'AMPLITUDE'
def __transfer( self, glb ):
if 'transfer' in glb: return glb['transfer']
dflt = self.__transfer_dflt( glb )
if dflt is not None: return dflt
return ''
def __transintent( self, glb ):
if 'transintent' in glb: return glb['transintent']
dflt = self.__transintent_dflt( glb )
if dflt is not None: return dflt
return 'BANDPASS'
def __dryrun( self, glb ):
if 'dryrun' in glb: return glb['dryrun']
dflt = self.__dryrun_dflt( glb )
if dflt is not None: return dflt
return False
def __acceptresults( self, glb ):
if 'acceptresults' in glb: return glb['acceptresults']
dflt = self.__acceptresults_dflt( glb )
if dflt is not None: return dflt
return True
#--------- subparam inp output ----------------------------------------------------
def __vis_inp(self):
if self.__vis_dflt( self.__globals_( ) ) is not None:
description = 'List of input MeasurementSets'
value = self.__vis( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __reference_inp(self):
if self.__reference_dflt( self.__globals_( ) ) is not None:
description = 'Reference calibrator field name(s)'
value = self.__reference( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'reference': value},{'reference': self.__schema['reference']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('reference',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __refintent_inp(self):
if self.__refintent_dflt( self.__globals_( ) ) is not None:
description = 'Observing intent of reference fields'
value = self.__refintent( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'refintent': value},{'refintent': self.__schema['refintent']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('refintent',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __transfer_inp(self):
if self.__transfer_dflt( self.__globals_( ) ) is not None:
description = 'Transfer calibrator field name(s)'
value = self.__transfer( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'transfer': value},{'transfer': self.__schema['transfer']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('transfer',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __transintent_inp(self):
if self.__transintent_dflt( self.__globals_( ) ) is not None:
description = 'Observing intent of transfer fields'
value = self.__transintent( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'transintent': value},{'transintent': self.__schema['transintent']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('transintent',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __reffile_inp(self):
description = 'Path to file with fluxes for non-solar system calibrators'
value = self.__reffile( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'reffile': value},{'reffile': self.__schema['reffile']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('reffile',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __normfluxes_inp(self):
description = 'Normalize lookup fluxes'
value = self.__normfluxes( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'normfluxes': value},{'normfluxes': self.__schema['normfluxes']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('normfluxes',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __scalebychan_inp(self):
description = 'Scale the flux density on a per channel basis or else on a per spw basis'
value = self.__scalebychan( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'scalebychan': value},{'scalebychan': self.__schema['scalebychan']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('scalebychan',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __pipelinemode_inp(self):
description = 'The pipeline operating mode'
value = self.__pipelinemode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post))
def __dryrun_inp(self):
if self.__dryrun_dflt( self.__globals_( ) ) is not None:
description = 'Run the task (False) or display commands (True)'
value = self.__dryrun( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __acceptresults_inp(self):
if self.__acceptresults_dflt( self.__globals_( ) ) is not None:
description = 'Automatically accept results into context'
value = self.__acceptresults( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'reference' in glb: del glb['reference']
if 'dryrun' in glb: del glb['dryrun']
if 'pipelinemode' in glb: del glb['pipelinemode']
if 'transintent' in glb: del glb['transintent']
if 'vis' in glb: del glb['vis']
if 'acceptresults' in glb: del glb['acceptresults']
if 'refintent' in glb: del glb['refintent']
if 'transfer' in glb: del glb['transfer']
if 'scalebychan' in glb: del glb['scalebychan']
if 'normfluxes' in glb: del glb['normfluxes']
if 'reffile' in glb: del glb['reffile']
#--------- inp function -----------------------------------------------------------
def inp(self):
print("# hif_setmodels -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__vis_inp( )
self.__reference_inp( )
self.__refintent_inp( )
self.__transfer_inp( )
self.__transintent_inp( )
self.__reffile_inp( )
self.__normfluxes_inp( )
self.__scalebychan_inp( )
self.__pipelinemode_inp( )
self.__dryrun_inp( )
self.__acceptresults_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("hif_setmodels.last"):
filename = "hif_setmodels.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
def __call__( self, vis=None, reference=None, refintent=None, transfer=None, transintent=None, reffile=None, normfluxes=None, scalebychan=None, pipelinemode=None, dryrun=None, acceptresults=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('hif_setmodels.pre')
_postfile = os.path.realpath('hif_setmodels.last')
_return_result_ = None
_arguments = [vis,reference,refintent,transfer,transintent,reffile,normfluxes,scalebychan,pipelinemode,dryrun,acceptresults]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if reffile is not None: local_global['reffile'] = reffile
if normfluxes is not None: local_global['normfluxes'] = normfluxes
if scalebychan is not None: local_global['scalebychan'] = scalebychan
if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['reffile'] = self.__reffile( local_global )
_invocation_parameters['normfluxes'] = self.__normfluxes( local_global )
_invocation_parameters['scalebychan'] = self.__scalebychan( local_global )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
_invocation_parameters['vis'] = self.__vis( _invocation_parameters ) if vis is None else vis
_invocation_parameters['reference'] = self.__reference( _invocation_parameters ) if reference is None else reference
_invocation_parameters['refintent'] = self.__refintent( _invocation_parameters ) if refintent is None else refintent
_invocation_parameters['transfer'] = self.__transfer( _invocation_parameters ) if transfer is None else transfer
_invocation_parameters['transintent'] = self.__transintent( _invocation_parameters ) if transintent is None else transintent
_invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun
_invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults
else:
# invoke with inp/go semantics
_invocation_parameters['vis'] = self.__vis( self.__globals_( ) )
_invocation_parameters['reference'] = self.__reference( self.__globals_( ) )
_invocation_parameters['refintent'] = self.__refintent( self.__globals_( ) )
_invocation_parameters['transfer'] = self.__transfer( self.__globals_( ) )
_invocation_parameters['transintent'] = self.__transintent( self.__globals_( ) )
_invocation_parameters['reffile'] = self.__reffile( self.__globals_( ) )
_invocation_parameters['normfluxes'] = self.__normfluxes( self.__globals_( ) )
_invocation_parameters['scalebychan'] = self.__scalebychan( self.__globals_( ) )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) )
_invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) )
_invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-13s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#hif_setmodels( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _hif_setmodels_t( _invocation_parameters['vis'],_invocation_parameters['reference'],_invocation_parameters['refintent'],_invocation_parameters['transfer'],_invocation_parameters['transintent'],_invocation_parameters['reffile'],_invocation_parameters['normfluxes'],_invocation_parameters['scalebychan'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('hif_setmodels')
casalog.post("Exception Reported: Error in hif_setmodels: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_
hif_setmodels = _hif_setmodels( )