##################### generated by xml-casa (v2) from h_importdata.xml ##############
##################### 795b96556326052763b32c35aa3cf694 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.h.cli import h_importdata as _h_importdata_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
class _h_importdata:
"""
h_importdata ---- Imports data into the interferometry pipeline
The h_importdata task loads the specified visibility data into the pipeline
context unpacking and / or converting it as necessary.
results -- If pipeline mode is 'getinputs' then None is returned. Otherwise
the results object for the pipeline task is returned.
--------- parameter descriptions ---------------------------------------------
vis List of visibility data files. These may be ASDMs, tar files of ASDMs,
MSs, or tar files of MSs, If ASDM files are specified, they will be
converted to MS format.
example: vis=['X227.ms', 'asdms.tar.gz']
session List of sessions to which the visibility files belong. Defaults to a
single session containing all the visibility files, otherwise
a session must be assigned to each vis file.
example: session=['session_1', 'session_2']
pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline
determines the values of all context defined pipeline inputs
automatically. In 'interactive' mode the user can set the pipeline
context defined parameters manually. In 'getinputs' mode the user
can check the settings of all pipeline parameters without running
the task.
asis ASDM tables to convert as is
default: 'Antenna Station Receiver CalAtmosphere'
Can only be set in pipelinemode='interactive'
example: 'Receiver', ''
process_caldevice Ingest the ASDM caldevice table.
Can only be set in pipelinemode='interactive'
overwrite Overwrite existing MSs on output.
Can only be set in pipelinemode='interactive'
nocopy When importing an MS, disable copying of the MS to the working
directory. Can only be set in pipelinemode='interactive'.
bdfflags Apply BDF flags on import. Can only be set in
pipelinemode='interactive'.
lazy Use the lazy import option. Can only be set in
pipelinemode='interactive'.
ocorr_mode Read in cross- and auto-correlation data(ca), cross-
correlation data only (co), or autocorrelation data only (ao).
createmms Create a multi-MeasurementSet ('true') ready for parallel
processing, or a standard MeasurementSet ('false'). The default setting
('automatic') creates an MMS if running in a cluster environment.
Can only be set in pipelinemode='interactive'
dryrun Run the task (False) or display task command (True)
acceptresults Add the results into the pipeline context
--------- examples -----------------------------------------------------------
Examples
1. Load an ASDM list in the ../rawdata subdirectory into the context"
h_importdata(vis=['../rawdata/uid___A002_X30a93d_X43e',
'../rawdata/uid_A002_x30a93d_X44e'])
2. Load an MS in the current directory into the context:
h_importdata(vis=[uid___A002_X30a93d_X43e.ms])
3. Load a tarred ASDM in ../rawdata into the context:
h_importdata(vis=['../rawdata/uid___A002_X30a93d_X43e.tar.gz'])
4. Check the h_importdata inputs, then import the data:
myvislist = ['uid___A002_X30a93d_X43e.ms', 'uid_A002_x30a93d_X44e.ms']
h_importdata(vis=myvislist, pipelinemode='getinputs')
h_importdata(vis=myvislist)
5. Load an ASDM but check the results before accepting them into the context.
results = h_importdata(vis=['uid___A002_X30a93d_X43e.ms'],
acceptresults=False)
results.accept()
6. Run in dryrun mode before running for real
results = h_importdata(vis=['uid___A002_X30a93d_X43e.ms'], dryrun=True)
results = h_importdata(vis=['uid___A002_X30a93d_X43e.ms'])
"""
_info_group_ = """pipeline"""
_info_desc_ = """Imports data into the interferometry pipeline"""
__schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'session': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'asis': {'type': 'cStr', 'coerce': _coerce.to_str}, 'process_caldevice': {'type': 'cBool'}, 'overwrite': {'type': 'cBool'}, 'nocopy': {'type': 'cBool'}, 'bdfflags': {'type': 'cBool'}, 'lazy': {'type': 'cBool'}, 'ocorr_mode': {'type': 'cStr', 'coerce': _coerce.to_str}, 'createmms': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'true', 'false' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}}
def __init__(self):
self.__stdout = None
self.__stderr = None
self.__root_frame_ = None
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 20 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
def __session_dflt( self, glb ):
return [ ]
def __session( self, glb ):
if 'session' in glb: return glb['session']
return [ ]
def __vis_dflt( self, glb ):
return [ ]
def __vis( self, glb ):
if 'vis' in glb: return glb['vis']
return [ ]
def __lazy_dflt( self, glb ):
return False
def __lazy( self, glb ):
if 'lazy' in glb: return glb['lazy']
return False
def __createmms_dflt( self, glb ):
return 'automatic'
def __createmms( self, glb ):
if 'createmms' in glb: return glb['createmms']
return 'automatic'
def __pipelinemode_dflt( self, glb ):
return 'automatic'
def __pipelinemode( self, glb ):
if 'pipelinemode' in glb: return glb['pipelinemode']
return 'automatic'
#--------- return inp/go default --------------------------------------------------
def __dryrun_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
return None
def __asis_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return ""
if self.__pipelinemode( glb ) == "getinputs": return ""
return None
def __acceptresults_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
return None
def __ocorr_mode_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return "ca"
if self.__pipelinemode( glb ) == "getinputs": return "ca"
return None
def __bdfflags_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
if self.__pipelinemode( glb ) == "getinputs": return bool(True)
return None
def __overwrite_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
if self.__pipelinemode( glb ) == "getinputs": return bool(False)
return None
def __process_caldevice_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
if self.__pipelinemode( glb ) == "getinputs": return bool(False)
return None
def __nocopy_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
if self.__pipelinemode( glb ) == "getinputs": return bool(False)
return None
#--------- return subparam values -------------------------------------------------
def __asis( self, glb ):
if 'asis' in glb: return glb['asis']
dflt = self.__asis_dflt( glb )
if dflt is not None: return dflt
return ''
def __process_caldevice( self, glb ):
if 'process_caldevice' in glb: return glb['process_caldevice']
dflt = self.__process_caldevice_dflt( glb )
if dflt is not None: return dflt
return False
def __overwrite( self, glb ):
if 'overwrite' in glb: return glb['overwrite']
dflt = self.__overwrite_dflt( glb )
if dflt is not None: return dflt
return False
def __nocopy( self, glb ):
if 'nocopy' in glb: return glb['nocopy']
dflt = self.__nocopy_dflt( glb )
if dflt is not None: return dflt
return False
def __bdfflags( self, glb ):
if 'bdfflags' in glb: return glb['bdfflags']
dflt = self.__bdfflags_dflt( glb )
if dflt is not None: return dflt
return True
def __ocorr_mode( self, glb ):
if 'ocorr_mode' in glb: return glb['ocorr_mode']
dflt = self.__ocorr_mode_dflt( glb )
if dflt is not None: return dflt
return 'ca'
def __dryrun( self, glb ):
if 'dryrun' in glb: return glb['dryrun']
dflt = self.__dryrun_dflt( glb )
if dflt is not None: return dflt
return False
def __acceptresults( self, glb ):
if 'acceptresults' in glb: return glb['acceptresults']
dflt = self.__acceptresults_dflt( glb )
if dflt is not None: return dflt
return True
#--------- subparam inp output ----------------------------------------------------
def __vis_inp(self):
description = 'List of input visibility data'
value = self.__vis( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-20.20s = %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __session_inp(self):
description = 'List of visibility data sessions'
value = self.__session( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'session': value},{'session': self.__schema['session']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-20.20s = %s%-23s%s' % ('session',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __pipelinemode_inp(self):
description = 'The pipeline operating mode'
value = self.__pipelinemode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('\x1B[1m\x1B[47m%-20.20s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post))
def __asis_inp(self):
if self.__asis_dflt( self.__globals_( ) ) is not None:
description = 'Extra ASDM tables to convert as is'
value = self.__asis( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'asis': value},{'asis': self.__schema['asis']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-17.17s =\x1B[0m %s%-23s%s' % ('asis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __process_caldevice_inp(self):
if self.__process_caldevice_dflt( self.__globals_( ) ) is not None:
description = 'Import the caldevice table from the ASDM'
value = self.__process_caldevice( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'process_caldevice': value},{'process_caldevice': self.__schema['process_caldevice']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-17.17s =\x1B[0m %s%-23s%s' % ('process_caldevice',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __overwrite_inp(self):
if self.__overwrite_dflt( self.__globals_( ) ) is not None:
description = 'Overwrite existing files on import'
value = self.__overwrite( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'overwrite': value},{'overwrite': self.__schema['overwrite']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-17.17s =\x1B[0m %s%-23s%s' % ('overwrite',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __nocopy_inp(self):
if self.__nocopy_dflt( self.__globals_( ) ) is not None:
description = 'Disable copying of MS to working directory'
value = self.__nocopy( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'nocopy': value},{'nocopy': self.__schema['nocopy']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-17.17s =\x1B[0m %s%-23s%s' % ('nocopy',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __bdfflags_inp(self):
if self.__bdfflags_dflt( self.__globals_( ) ) is not None:
description = 'Apply BDF flags on import'
value = self.__bdfflags( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'bdfflags': value},{'bdfflags': self.__schema['bdfflags']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-17.17s =\x1B[0m %s%-23s%s' % ('bdfflags',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __lazy_inp(self):
description = 'Use the lazy import option'
value = self.__lazy( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'lazy': value},{'lazy': self.__schema['lazy']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-20.20s = %s%-23s%s' % ('lazy',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __ocorr_mode_inp(self):
if self.__ocorr_mode_dflt( self.__globals_( ) ) is not None:
description = 'Correlation data mode'
value = self.__ocorr_mode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'ocorr_mode': value},{'ocorr_mode': self.__schema['ocorr_mode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-17.17s =\x1B[0m %s%-23s%s' % ('ocorr_mode',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __createmms_inp(self):
description = 'Create an MMS'
value = self.__createmms( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'createmms': value},{'createmms': self.__schema['createmms']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-20.20s = %s%-23s%s' % ('createmms',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __dryrun_inp(self):
if self.__dryrun_dflt( self.__globals_( ) ) is not None:
description = 'Run the task (False) or display task command (True)'
value = self.__dryrun( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-17.17s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __acceptresults_inp(self):
if self.__acceptresults_dflt( self.__globals_( ) ) is not None:
description = 'Add the results into the pipeline context'
value = self.__acceptresults( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-17.17s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'dryrun' in glb: del glb['dryrun']
if 'lazy' in glb: del glb['lazy']
if 'asis' in glb: del glb['asis']
if 'pipelinemode' in glb: del glb['pipelinemode']
if 'createmms' in glb: del glb['createmms']
if 'vis' in glb: del glb['vis']
if 'acceptresults' in glb: del glb['acceptresults']
if 'ocorr_mode' in glb: del glb['ocorr_mode']
if 'bdfflags' in glb: del glb['bdfflags']
if 'session' in glb: del glb['session']
if 'overwrite' in glb: del glb['overwrite']
if 'process_caldevice' in glb: del glb['process_caldevice']
if 'nocopy' in glb: del glb['nocopy']
#--------- inp function -----------------------------------------------------------
def inp(self):
print("# h_importdata -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__vis_inp( )
self.__session_inp( )
self.__pipelinemode_inp( )
self.__asis_inp( )
self.__process_caldevice_inp( )
self.__overwrite_inp( )
self.__nocopy_inp( )
self.__bdfflags_inp( )
self.__lazy_inp( )
self.__ocorr_mode_inp( )
self.__createmms_inp( )
self.__dryrun_inp( )
self.__acceptresults_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("h_importdata.last"):
filename = "h_importdata.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
def __call__( self, vis=None, session=None, pipelinemode=None, asis=None, process_caldevice=None, overwrite=None, nocopy=None, bdfflags=None, lazy=None, ocorr_mode=None, createmms=None, dryrun=None, acceptresults=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('h_importdata.pre')
_postfile = os.path.realpath('h_importdata.last')
_return_result_ = None
_arguments = [vis,session,pipelinemode,asis,process_caldevice,overwrite,nocopy,bdfflags,lazy,ocorr_mode,createmms,dryrun,acceptresults]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if vis is not None: local_global['vis'] = vis
if session is not None: local_global['session'] = session
if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode
if lazy is not None: local_global['lazy'] = lazy
if createmms is not None: local_global['createmms'] = createmms
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['vis'] = self.__vis( local_global )
_invocation_parameters['session'] = self.__session( local_global )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global )
_invocation_parameters['lazy'] = self.__lazy( local_global )
_invocation_parameters['createmms'] = self.__createmms( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
_invocation_parameters['asis'] = self.__asis( _invocation_parameters ) if asis is None else asis
_invocation_parameters['process_caldevice'] = self.__process_caldevice( _invocation_parameters ) if process_caldevice is None else process_caldevice
_invocation_parameters['overwrite'] = self.__overwrite( _invocation_parameters ) if overwrite is None else overwrite
_invocation_parameters['nocopy'] = self.__nocopy( _invocation_parameters ) if nocopy is None else nocopy
_invocation_parameters['bdfflags'] = self.__bdfflags( _invocation_parameters ) if bdfflags is None else bdfflags
_invocation_parameters['ocorr_mode'] = self.__ocorr_mode( _invocation_parameters ) if ocorr_mode is None else ocorr_mode
_invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun
_invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults
else:
# invoke with inp/go semantics
_invocation_parameters['vis'] = self.__vis( self.__globals_( ) )
_invocation_parameters['session'] = self.__session( self.__globals_( ) )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) )
_invocation_parameters['asis'] = self.__asis( self.__globals_( ) )
_invocation_parameters['process_caldevice'] = self.__process_caldevice( self.__globals_( ) )
_invocation_parameters['overwrite'] = self.__overwrite( self.__globals_( ) )
_invocation_parameters['nocopy'] = self.__nocopy( self.__globals_( ) )
_invocation_parameters['bdfflags'] = self.__bdfflags( self.__globals_( ) )
_invocation_parameters['lazy'] = self.__lazy( self.__globals_( ) )
_invocation_parameters['ocorr_mode'] = self.__ocorr_mode( self.__globals_( ) )
_invocation_parameters['createmms'] = self.__createmms( self.__globals_( ) )
_invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) )
_invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-17s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#h_importdata( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _h_importdata_t( _invocation_parameters['vis'],_invocation_parameters['session'],_invocation_parameters['pipelinemode'],_invocation_parameters['asis'],_invocation_parameters['process_caldevice'],_invocation_parameters['overwrite'],_invocation_parameters['nocopy'],_invocation_parameters['bdfflags'],_invocation_parameters['lazy'],_invocation_parameters['ocorr_mode'],_invocation_parameters['createmms'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('h_importdata')
casalog.post("Exception Reported: Error in h_importdata: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_
h_importdata = _h_importdata( )