##################### generated by xml-casa (v2) from h_save.xml ####################
##################### 721a1fd06826cd6f42da993fce2f5433 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.h.cli import h_save as _h_save_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
class _h_save:
"""
h_save ---- Save the pipeline state to disk
h_save saves the current pipeline state to disk using a unique filename.
If no name is supplied one is generated automatically from a combination
of the root name, 'context', the current stage number, and a time stamp.
--------- parameter descriptions ---------------------------------------------
filename Name of the saved pipeline state. If filename is '' then
a unique name will be generated computed several components: the
root, 'context', the current stage number, and the time stamp.
--------- examples -----------------------------------------------------------
1. Save the current state in the default file
h_save()
2. Save the current state to a file called 'savestate_1'
h_save(filename='savestate_1')
"""
_info_group_ = """pipeline"""
_info_desc_ = """Save the pipeline state to disk"""
__schema = {'filename': {'type': 'cStr', 'coerce': _coerce.to_str}}
def __init__(self):
self.__stdout = None
self.__stderr = None
self.__root_frame_ = None
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 8 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
def __filename_dflt( self, glb ):
return ''
def __filename( self, glb ):
if 'filename' in glb: return glb['filename']
return ''
#--------- return inp/go default --------------------------------------------------
#--------- return subparam values -------------------------------------------------
#--------- subparam inp output ----------------------------------------------------
def __filename_inp(self):
description = 'Name for saved state'
value = self.__filename( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'filename': value},{'filename': self.__schema['filename']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-8.8s = %s%-23s%s' % ('filename',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'filename' in glb: del glb['filename']
#--------- inp function -----------------------------------------------------------
def inp(self):
print("# h_save -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__filename_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("h_save.last"):
filename = "h_save.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
def __call__( self, filename=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('h_save.pre')
_postfile = os.path.realpath('h_save.last')
_return_result_ = None
_arguments = [filename]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if filename is not None: local_global['filename'] = filename
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['filename'] = self.__filename( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
else:
# invoke with inp/go semantics
_invocation_parameters['filename'] = self.__filename( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-8s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#h_save( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _h_save_t( _invocation_parameters['filename'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('h_save')
casalog.post("Exception Reported: Error in h_save: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_
h_save = _h_save( )