##################### generated by xml-casa (v2) from h_init.xml ####################
##################### ae6e964178930a754f57da90b5711411 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.h.cli import h_init as _h_init_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
class _h_init:
"""
h_init ---- Initialize the interferometry pipeline
The h_init task initializes the interferometry pipeline and optionally
imports data.
h_init must be called before any other interferometry pipeline task. The
pipeline can be initialized in one of two ways: by creating a new pipeline
state (h_init) or be loading a saved pipeline state (h_resume).
h_init creates an empty pipeline context but does not load visibility data
into the context. hif_importdata or hsd_importdata can be used to load data.
If pipeline mode is 'getinputs' then None is returned. Otherwise
the results object for the pipeline task is returned.
--------- parameter descriptions ---------------------------------------------
pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline determines the values of all context defined pipeline inputs automatically. In interactive mode the user can set the pipeline context defined parameters manually. In 'getinputs' mode the user can check the settings of all pipeline parameters without running the task.
loglevel Log level for pipeline messages. Log messages below this threshold will not be displayed.
plotlevel Toggle generation of detail plots in the web log. A level of 'all' generates
all plots; 'summary' omits detail plots; 'default' generates all plots
apart from for the hif_applycal task.
output_dir Working directory for pipeline processing. Some pipeline
processing products such as HTML logs and images will be directed to
subdirectories of this path.
weblog Generate the web log
overwrite Overwrite existing files on import
dryrun Run the task (False) or display the task command (True)
acceptresults Add the results into the pipeline context
--------- examples -----------------------------------------------------------
Examples
1. Create the pipeline context
h_init()
"""
_info_group_ = """pipeline"""
_info_desc_ = """Initialize the interferometry pipeline"""
__schema = {'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'loglevel': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'error', 'info', 'critical', 'debug', 'trace', 'warning' ]}, 'plotlevel': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'all', 'default', 'summary' ]}, 'output_dir': {'type': 'cStr', 'coerce': _coerce.to_str}, 'weblog': {'type': 'cBool'}, 'overwrite': {'type': 'cBool'}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}}
def __init__(self):
self.__stdout = None
self.__stderr = None
self.__root_frame_ = None
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 16 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
def __pipelinemode_dflt( self, glb ):
return 'automatic'
def __pipelinemode( self, glb ):
if 'pipelinemode' in glb: return glb['pipelinemode']
return 'automatic'
#--------- return inp/go default --------------------------------------------------
def __dryrun_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
return None
def __output_dir_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return "./"
if self.__pipelinemode( glb ) == "getinputs": return "./"
return None
def __plotlevel_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return "default"
if self.__pipelinemode( glb ) == "getinputs": return "default"
return None
def __acceptresults_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
return None
def __weblog_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
if self.__pipelinemode( glb ) == "getinputs": return bool(True)
return None
def __overwrite_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
if self.__pipelinemode( glb ) == "getinputs": return bool(True)
return None
def __loglevel_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return "info"
if self.__pipelinemode( glb ) == "getinputs": return "info"
return None
#--------- return subparam values -------------------------------------------------
def __loglevel( self, glb ):
if 'loglevel' in glb: return glb['loglevel']
dflt = self.__loglevel_dflt( glb )
if dflt is not None: return dflt
return 'info'
def __plotlevel( self, glb ):
if 'plotlevel' in glb: return glb['plotlevel']
dflt = self.__plotlevel_dflt( glb )
if dflt is not None: return dflt
return 'default'
def __output_dir( self, glb ):
if 'output_dir' in glb: return glb['output_dir']
dflt = self.__output_dir_dflt( glb )
if dflt is not None: return dflt
return './'
def __weblog( self, glb ):
if 'weblog' in glb: return glb['weblog']
dflt = self.__weblog_dflt( glb )
if dflt is not None: return dflt
return True
def __overwrite( self, glb ):
if 'overwrite' in glb: return glb['overwrite']
dflt = self.__overwrite_dflt( glb )
if dflt is not None: return dflt
return True
def __dryrun( self, glb ):
if 'dryrun' in glb: return glb['dryrun']
dflt = self.__dryrun_dflt( glb )
if dflt is not None: return dflt
return False
def __acceptresults( self, glb ):
if 'acceptresults' in glb: return glb['acceptresults']
dflt = self.__acceptresults_dflt( glb )
if dflt is not None: return dflt
return True
#--------- subparam inp output ----------------------------------------------------
def __pipelinemode_inp(self):
description = 'The pipeline operating mode'
value = self.__pipelinemode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post))
def __loglevel_inp(self):
if self.__loglevel_dflt( self.__globals_( ) ) is not None:
description = 'Log level for pipeline messages'
value = self.__loglevel( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'loglevel': value},{'loglevel': self.__schema['loglevel']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('loglevel',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __plotlevel_inp(self):
if self.__plotlevel_dflt( self.__globals_( ) ) is not None:
description = 'Pipeline plot level threshold'
value = self.__plotlevel( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'plotlevel': value},{'plotlevel': self.__schema['plotlevel']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('plotlevel',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __output_dir_inp(self):
if self.__output_dir_dflt( self.__globals_( ) ) is not None:
description = 'The output working directory'
value = self.__output_dir( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'output_dir': value},{'output_dir': self.__schema['output_dir']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('output_dir',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __weblog_inp(self):
if self.__weblog_dflt( self.__globals_( ) ) is not None:
description = 'Generate the web log'
value = self.__weblog( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'weblog': value},{'weblog': self.__schema['weblog']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('weblog',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __overwrite_inp(self):
if self.__overwrite_dflt( self.__globals_( ) ) is not None:
description = 'Overwrite existing files on import'
value = self.__overwrite( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'overwrite': value},{'overwrite': self.__schema['overwrite']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('overwrite',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __dryrun_inp(self):
if self.__dryrun_dflt( self.__globals_( ) ) is not None:
description = 'Run the task (False) or display the task command (True)'
value = self.__dryrun( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __acceptresults_inp(self):
if self.__acceptresults_dflt( self.__globals_( ) ) is not None:
description = 'Add the results into the pipeline context'
value = self.__acceptresults( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'dryrun' in glb: del glb['dryrun']
if 'pipelinemode' in glb: del glb['pipelinemode']
if 'output_dir' in glb: del glb['output_dir']
if 'plotlevel' in glb: del glb['plotlevel']
if 'acceptresults' in glb: del glb['acceptresults']
if 'weblog' in glb: del glb['weblog']
if 'overwrite' in glb: del glb['overwrite']
if 'loglevel' in glb: del glb['loglevel']
#--------- inp function -----------------------------------------------------------
def inp(self):
print("# h_init -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__pipelinemode_inp( )
self.__loglevel_inp( )
self.__plotlevel_inp( )
self.__output_dir_inp( )
self.__weblog_inp( )
self.__overwrite_inp( )
self.__dryrun_inp( )
self.__acceptresults_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("h_init.last"):
filename = "h_init.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
def __call__( self, pipelinemode=None, loglevel=None, plotlevel=None, output_dir=None, weblog=None, overwrite=None, dryrun=None, acceptresults=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('h_init.pre')
_postfile = os.path.realpath('h_init.last')
_return_result_ = None
_arguments = [pipelinemode,loglevel,plotlevel,output_dir,weblog,overwrite,dryrun,acceptresults]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
_invocation_parameters['loglevel'] = self.__loglevel( _invocation_parameters ) if loglevel is None else loglevel
_invocation_parameters['plotlevel'] = self.__plotlevel( _invocation_parameters ) if plotlevel is None else plotlevel
_invocation_parameters['output_dir'] = self.__output_dir( _invocation_parameters ) if output_dir is None else output_dir
_invocation_parameters['weblog'] = self.__weblog( _invocation_parameters ) if weblog is None else weblog
_invocation_parameters['overwrite'] = self.__overwrite( _invocation_parameters ) if overwrite is None else overwrite
_invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun
_invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults
else:
# invoke with inp/go semantics
_invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) )
_invocation_parameters['loglevel'] = self.__loglevel( self.__globals_( ) )
_invocation_parameters['plotlevel'] = self.__plotlevel( self.__globals_( ) )
_invocation_parameters['output_dir'] = self.__output_dir( self.__globals_( ) )
_invocation_parameters['weblog'] = self.__weblog( self.__globals_( ) )
_invocation_parameters['overwrite'] = self.__overwrite( self.__globals_( ) )
_invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) )
_invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-13s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#h_init( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _h_init_t( _invocation_parameters['pipelinemode'],_invocation_parameters['loglevel'],_invocation_parameters['plotlevel'],_invocation_parameters['output_dir'],_invocation_parameters['weblog'],_invocation_parameters['overwrite'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('h_init')
casalog.post("Exception Reported: Error in h_init: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_
h_init = _h_init( )