##################### generated by xml-casa (v2) from hsd_restoredata.xml ###########
##################### 795eee107c80cfd68a0104632eb58a51 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hsd.cli import hsd_restoredata as _hsd_restoredata_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
class _hsd_restoredata:
"""
hsd_restoredata ---- Restore flagged and calibration single dish data from a pipeline run
The hsd_restoredata task restores flagged and calibrated MeasurementSets
from archived ASDMs and pipeline flagging and calibration date products.
The hsd_restoredata task restores flagged and calibrated data from archived
ASDMs and pipeline flagging and calibration data products. Pending archive
retrieval support hsd_restoredata assumes that the required products
are available in the rawdata_dir in the format produced by the
hifa_exportdata task.
hsd_restoredata assumes that the following entities are available in the raw
data directory
o the ASDMs to be restored
o for each ASDM in the input list
o a compressed tar file of the final flagversions file, e.g.
uid___A002_X30a93d_X43e.ms.flagversions.tar.gz
o a text file containing the applycal instructions, e.g.
uid___A002_X30a93d_X43e.ms.calapply.txt
o a compressed tar file containing the caltables for the parent session,
e.g. uid___A001_X74_X29.session_3.caltables.tar.gz
hsd_restoredata performs the following operations
o imports the ASDM(s))
o removes the default MS.flagversions directory created by the filler
o restores the final MS.flagversions directory stored by the pipeline
o restores the final set of pipeline flags to the MS
o restores the final calibration state of the MS
o restores the final calibration tables for each MS
o applies the calibration tables to each MS
Output:
results -- If pipeline mode is 'getinputs' then None is returned. Otherwise
the results object for the pipeline task is returned.
--------- parameter descriptions ---------------------------------------------
vis List of raw visibility data files to be restored. Assumed to be
in the directory specified by rawdata_dir.
example: vis=['uid___A002_X30a93d_X43e']
session List of sessions one per visibility file.
example: session=['session_3']
products_dir Name of the data products directory to copy calibration
products from. The parameter is effective only when copytoraw = True.
When copytoraw = False, calibration products in rawdata_dir will be used.
example: products_dir='myproductspath'
copytoraw Copy calibration and flagging tables from products_dir to
rawdata_dir directory.
example: copytoraw=False
rawdata_dir Name of the raw data directory.
example: rawdata_dir='myrawdatapath'
lazy Use the lazy filler option
example: lazy=True
bdfflags Set the BDF flags
example: bdfflags=False
ocorr_mode Set ocorr_mode
example: ocorr_mode='ca'
asis Set list of tables to import asis.
example: asis='Source Receiver'
pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline
determines the values of all context defined pipeline inputs automatically.
In 'interactive' mode the user can set the pipeline context defined
parameters manually. In 'getinputs' mode the user can check the settings
of all pipeline parameters without running the task.
dryrun Run the commands (True) or generate the commands to be run but
do not execute (False).
acceptresults Add the results of the task to the pipeline context (True) or
reject them (False).
--------- examples -----------------------------------------------------------
1. Restore the pipeline results for a single ASDM in a single session
hsd_restoredata (vis=['uid___A002_X30a93d_X43e'], session=['session_1'], ocorr_mode='ao')
"""
_info_group_ = """pipeline"""
_info_desc_ = """Restore flagged and calibration single dish data from a pipeline run"""
__schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'session': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'products_dir': {'type': 'cStr', 'coerce': _coerce.to_str}, 'copytoraw': {'type': 'cBool'}, 'rawdata_dir': {'type': 'cStr', 'coerce': _coerce.to_str}, 'lazy': {'type': 'cBool'}, 'bdfflags': {'type': 'cBool'}, 'ocorr_mode': {'type': 'cStr', 'coerce': _coerce.to_str}, 'asis': {'type': 'cStr', 'coerce': _coerce.to_str}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}}
def __init__(self):
self.__stdout = None
self.__stderr = None
self.__root_frame_ = None
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 16 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
def __rawdata_dir_dflt( self, glb ):
return '../rawdata'
def __rawdata_dir( self, glb ):
if 'rawdata_dir' in glb: return glb['rawdata_dir']
return '../rawdata'
def __session_dflt( self, glb ):
return [ ]
def __session( self, glb ):
if 'session' in glb: return glb['session']
return [ ]
def __vis_dflt( self, glb ):
return [ ]
def __vis( self, glb ):
if 'vis' in glb: return glb['vis']
return [ ]
def __lazy_dflt( self, glb ):
return False
def __lazy( self, glb ):
if 'lazy' in glb: return glb['lazy']
return False
def __asis_dflt( self, glb ):
return 'SBSummary ExecBlock Antenna Station Receiver Source CalAtmosphere CalWVR'
def __asis( self, glb ):
if 'asis' in glb: return glb['asis']
return 'SBSummary ExecBlock Antenna Station Receiver Source CalAtmosphere CalWVR'
def __copytoraw_dflt( self, glb ):
return True
def __copytoraw( self, glb ):
if 'copytoraw' in glb: return glb['copytoraw']
return True
def __pipelinemode_dflt( self, glb ):
return 'automatic'
def __pipelinemode( self, glb ):
if 'pipelinemode' in glb: return glb['pipelinemode']
return 'automatic'
def __bdfflags_dflt( self, glb ):
return True
def __bdfflags( self, glb ):
if 'bdfflags' in glb: return glb['bdfflags']
return True
def __ocorr_mode_dflt( self, glb ):
return 'ao'
def __ocorr_mode( self, glb ):
if 'ocorr_mode' in glb: return glb['ocorr_mode']
return 'ao'
def __products_dir_dflt( self, glb ):
return '../products'
def __products_dir( self, glb ):
if 'products_dir' in glb: return glb['products_dir']
return '../products'
#--------- return inp/go default --------------------------------------------------
def __acceptresults_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
return None
def __dryrun_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
return None
#--------- return subparam values -------------------------------------------------
def __dryrun( self, glb ):
if 'dryrun' in glb: return glb['dryrun']
dflt = self.__dryrun_dflt( glb )
if dflt is not None: return dflt
return False
def __acceptresults( self, glb ):
if 'acceptresults' in glb: return glb['acceptresults']
dflt = self.__acceptresults_dflt( glb )
if dflt is not None: return dflt
return True
#--------- subparam inp output ----------------------------------------------------
def __vis_inp(self):
description = 'List of input visibility data'
value = self.__vis( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __session_inp(self):
description = 'List of sessions one per visibility file'
value = self.__session( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'session': value},{'session': self.__schema['session']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('session',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __products_dir_inp(self):
description = 'The archived pipeline data products directory'
value = self.__products_dir( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'products_dir': value},{'products_dir': self.__schema['products_dir']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('products_dir',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __copytoraw_inp(self):
description = 'Copy calibration and flagging tables to raw data directory'
value = self.__copytoraw( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'copytoraw': value},{'copytoraw': self.__schema['copytoraw']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('copytoraw',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __rawdata_dir_inp(self):
description = 'The raw data directory'
value = self.__rawdata_dir( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'rawdata_dir': value},{'rawdata_dir': self.__schema['rawdata_dir']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('rawdata_dir',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __lazy_inp(self):
description = 'Use the lazy filler option'
value = self.__lazy( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'lazy': value},{'lazy': self.__schema['lazy']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('lazy',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __bdfflags_inp(self):
description = 'Set the BDF flags'
value = self.__bdfflags( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'bdfflags': value},{'bdfflags': self.__schema['bdfflags']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('bdfflags',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __ocorr_mode_inp(self):
description = 'Correlation import mode'
value = self.__ocorr_mode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'ocorr_mode': value},{'ocorr_mode': self.__schema['ocorr_mode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('ocorr_mode',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __asis_inp(self):
description = 'List of tables to import asis'
value = self.__asis( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'asis': value},{'asis': self.__schema['asis']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-16.16s = %s%-23s%s' % ('asis',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __pipelinemode_inp(self):
description = 'The pipeline operating mode'
value = self.__pipelinemode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post))
def __dryrun_inp(self):
if self.__dryrun_dflt( self.__globals_( ) ) is not None:
description = 'Run the task (False) or display task command (True)'
value = self.__dryrun( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __acceptresults_inp(self):
if self.__acceptresults_dflt( self.__globals_( ) ) is not None:
description = 'Add the results into the pipeline context'
value = self.__acceptresults( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'dryrun' in glb: del glb['dryrun']
if 'lazy' in glb: del glb['lazy']
if 'asis' in glb: del glb['asis']
if 'pipelinemode' in glb: del glb['pipelinemode']
if 'rawdata_dir' in glb: del glb['rawdata_dir']
if 'vis' in glb: del glb['vis']
if 'acceptresults' in glb: del glb['acceptresults']
if 'ocorr_mode' in glb: del glb['ocorr_mode']
if 'bdfflags' in glb: del glb['bdfflags']
if 'session' in glb: del glb['session']
if 'copytoraw' in glb: del glb['copytoraw']
if 'products_dir' in glb: del glb['products_dir']
#--------- inp function -----------------------------------------------------------
def inp(self):
print("# hsd_restoredata -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__vis_inp( )
self.__session_inp( )
self.__products_dir_inp( )
self.__copytoraw_inp( )
self.__rawdata_dir_inp( )
self.__lazy_inp( )
self.__bdfflags_inp( )
self.__ocorr_mode_inp( )
self.__asis_inp( )
self.__pipelinemode_inp( )
self.__dryrun_inp( )
self.__acceptresults_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("hsd_restoredata.last"):
filename = "hsd_restoredata.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
def __call__( self, vis=None, session=None, products_dir=None, copytoraw=None, rawdata_dir=None, lazy=None, bdfflags=None, ocorr_mode=None, asis=None, pipelinemode=None, dryrun=None, acceptresults=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('hsd_restoredata.pre')
_postfile = os.path.realpath('hsd_restoredata.last')
_return_result_ = None
_arguments = [vis,session,products_dir,copytoraw,rawdata_dir,lazy,bdfflags,ocorr_mode,asis,pipelinemode,dryrun,acceptresults]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if vis is not None: local_global['vis'] = vis
if session is not None: local_global['session'] = session
if products_dir is not None: local_global['products_dir'] = products_dir
if copytoraw is not None: local_global['copytoraw'] = copytoraw
if rawdata_dir is not None: local_global['rawdata_dir'] = rawdata_dir
if lazy is not None: local_global['lazy'] = lazy
if bdfflags is not None: local_global['bdfflags'] = bdfflags
if ocorr_mode is not None: local_global['ocorr_mode'] = ocorr_mode
if asis is not None: local_global['asis'] = asis
if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['vis'] = self.__vis( local_global )
_invocation_parameters['session'] = self.__session( local_global )
_invocation_parameters['products_dir'] = self.__products_dir( local_global )
_invocation_parameters['copytoraw'] = self.__copytoraw( local_global )
_invocation_parameters['rawdata_dir'] = self.__rawdata_dir( local_global )
_invocation_parameters['lazy'] = self.__lazy( local_global )
_invocation_parameters['bdfflags'] = self.__bdfflags( local_global )
_invocation_parameters['ocorr_mode'] = self.__ocorr_mode( local_global )
_invocation_parameters['asis'] = self.__asis( local_global )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
_invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun
_invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults
else:
# invoke with inp/go semantics
_invocation_parameters['vis'] = self.__vis( self.__globals_( ) )
_invocation_parameters['session'] = self.__session( self.__globals_( ) )
_invocation_parameters['products_dir'] = self.__products_dir( self.__globals_( ) )
_invocation_parameters['copytoraw'] = self.__copytoraw( self.__globals_( ) )
_invocation_parameters['rawdata_dir'] = self.__rawdata_dir( self.__globals_( ) )
_invocation_parameters['lazy'] = self.__lazy( self.__globals_( ) )
_invocation_parameters['bdfflags'] = self.__bdfflags( self.__globals_( ) )
_invocation_parameters['ocorr_mode'] = self.__ocorr_mode( self.__globals_( ) )
_invocation_parameters['asis'] = self.__asis( self.__globals_( ) )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) )
_invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) )
_invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-13s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#hsd_restoredata( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _hsd_restoredata_t( _invocation_parameters['vis'],_invocation_parameters['session'],_invocation_parameters['products_dir'],_invocation_parameters['copytoraw'],_invocation_parameters['rawdata_dir'],_invocation_parameters['lazy'],_invocation_parameters['bdfflags'],_invocation_parameters['ocorr_mode'],_invocation_parameters['asis'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('hsd_restoredata')
casalog.post("Exception Reported: Error in hsd_restoredata: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_
hsd_restoredata = _hsd_restoredata( )