##################### generated by xml-casa (v2) from hifas_imageprecheck.xml #######
##################### 601460e00cb5bbd639cf142e5804e797 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hifas.cli import hifas_imageprecheck as _hifas_imageprecheck_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
class _hifas_imageprecheck:
"""
hifas_imageprecheck ---- Calculates the best robust value and Briggs weighting parameter to achieve sensitivity and angular resolution goals.
In this task, the representative source and the spw containing the
representative frequency selected by the PI in the OT are used to calculate
the synthesized beam and to make sensitivity estimates for the aggregate
bandwidth and representative bandwidth for several values of the robust parameter.
This information is reported in a table in the weblog. If no representative
target/frequency information is available, it defaults to the first target
and center of first spw in the data (i.e. pre-Cycle 5 data does not have
this information available). The best Briggs weighting parameter to achieve
the PI's desired angular resolution is chosen automatically.
See the User's guide for further details.
results -- If pipeline mode is 'getinputs' then None is returned. Otherwise
the results object for the pipeline task is returned.
--------- parameter descriptions ---------------------------------------------
vis The list of input MeasurementSets. Defaults to the list of
MeasurementSets specified in the h_init or hif_importdata task.
'': use all MeasurementSets in the context
Examples: 'ngc5921.ms', ['ngc5921a.ms', ngc5921b.ms', 'ngc5921c.ms']
desired_angular_resolution User specified angular resolution goal string.
'': automatic from performance parameters (default)
Example: '1.0arcsec'
calcsb Force (re-)calculation of sensitivities and beams
parallel Use MPI cluster where possible
pipelinemode The pipeline operating mode.
In 'automatic' mode the pipeline determines the values of all
context defined pipeline inputs automatically.
In 'interactive' mode the user can set the pipeline context
defined parameters manually.
In 'getinputs' mode the user can check the settings of all
pipeline parameters without running the task.
dryrun Run the task (False) or just display the command (True)
acceptresults Add the results of the task to the pipeline context (True) or
reject them (False).
--------- examples -----------------------------------------------------------
"""
_info_group_ = """pipeline"""
_info_desc_ = """Calculates the best robust value and Briggs weighting parameter to achieve sensitivity and angular resolution goals."""
__schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'desired_angular_resolution': {'type': 'cStr', 'coerce': _coerce.to_str}, 'calcsb': {'type': 'cBool'}, 'parallel': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'true', 'false' ]}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}}
def __init__(self):
self.__stdout = None
self.__stderr = None
self.__root_frame_ = None
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 26 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
def __desired_angular_resolution_dflt( self, glb ):
return ''
def __desired_angular_resolution( self, glb ):
if 'desired_angular_resolution' in glb: return glb['desired_angular_resolution']
return ''
def __parallel_dflt( self, glb ):
return 'automatic'
def __parallel( self, glb ):
if 'parallel' in glb: return glb['parallel']
return 'automatic'
def __calcsb_dflt( self, glb ):
return False
def __calcsb( self, glb ):
if 'calcsb' in glb: return glb['calcsb']
return False
def __pipelinemode_dflt( self, glb ):
return 'automatic'
def __pipelinemode( self, glb ):
if 'pipelinemode' in glb: return glb['pipelinemode']
return 'automatic'
#--------- return inp/go default --------------------------------------------------
def __acceptresults_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
return None
def __dryrun_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
return None
def __vis_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return []
if self.__pipelinemode( glb ) == "getinputs": return []
return None
#--------- return subparam values -------------------------------------------------
def __vis( self, glb ):
if 'vis' in glb: return glb['vis']
dflt = self.__vis_dflt( glb )
if dflt is not None: return dflt
return [ ]
def __dryrun( self, glb ):
if 'dryrun' in glb: return glb['dryrun']
dflt = self.__dryrun_dflt( glb )
if dflt is not None: return dflt
return False
def __acceptresults( self, glb ):
if 'acceptresults' in glb: return glb['acceptresults']
dflt = self.__acceptresults_dflt( glb )
if dflt is not None: return dflt
return True
#--------- subparam inp output ----------------------------------------------------
def __vis_inp(self):
if self.__vis_dflt( self.__globals_( ) ) is not None:
description = 'List of input MeasurementSets'
value = self.__vis( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-23.23s =\x1B[0m %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __desired_angular_resolution_inp(self):
description = 'User specified angular resolution goal'
value = self.__desired_angular_resolution( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'desired_angular_resolution': value},{'desired_angular_resolution': self.__schema['desired_angular_resolution']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-26.26s = %s%-23s%s' % ('desired_angular_resolution',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __calcsb_inp(self):
description = 'Force (re-)calculation of sensitivities and beams'
value = self.__calcsb( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'calcsb': value},{'calcsb': self.__schema['calcsb']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-26.26s = %s%-23s%s' % ('calcsb',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __parallel_inp(self):
description = 'Use MPI cluster where possible'
value = self.__parallel( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'parallel': value},{'parallel': self.__schema['parallel']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('%-26.26s = %s%-23s%s' % ('parallel',pre,self.__to_string_(value),post),description,0+len(pre)+len(post))
def __pipelinemode_inp(self):
description = 'The pipeline operating mode'
value = self.__pipelinemode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('\x1B[1m\x1B[47m%-26.26s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post))
def __dryrun_inp(self):
if self.__dryrun_dflt( self.__globals_( ) ) is not None:
description = 'Run the task (False) or just display the command (True)'
value = self.__dryrun( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-23.23s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __acceptresults_inp(self):
if self.__acceptresults_dflt( self.__globals_( ) ) is not None:
description = 'Add the results into the pipeline context'
value = self.__acceptresults( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-23.23s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'dryrun' in glb: del glb['dryrun']
if 'pipelinemode' in glb: del glb['pipelinemode']
if 'desired_angular_resolution' in glb: del glb['desired_angular_resolution']
if 'vis' in glb: del glb['vis']
if 'acceptresults' in glb: del glb['acceptresults']
if 'calcsb' in glb: del glb['calcsb']
if 'parallel' in glb: del glb['parallel']
#--------- inp function -----------------------------------------------------------
def inp(self):
print("# hifas_imageprecheck -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__vis_inp( )
self.__desired_angular_resolution_inp( )
self.__calcsb_inp( )
self.__parallel_inp( )
self.__pipelinemode_inp( )
self.__dryrun_inp( )
self.__acceptresults_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("hifas_imageprecheck.last"):
filename = "hifas_imageprecheck.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
def __call__( self, vis=None, desired_angular_resolution=None, calcsb=None, parallel=None, pipelinemode=None, dryrun=None, acceptresults=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('hifas_imageprecheck.pre')
_postfile = os.path.realpath('hifas_imageprecheck.last')
_return_result_ = None
_arguments = [vis,desired_angular_resolution,calcsb,parallel,pipelinemode,dryrun,acceptresults]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if desired_angular_resolution is not None: local_global['desired_angular_resolution'] = desired_angular_resolution
if calcsb is not None: local_global['calcsb'] = calcsb
if parallel is not None: local_global['parallel'] = parallel
if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['desired_angular_resolution'] = self.__desired_angular_resolution( local_global )
_invocation_parameters['calcsb'] = self.__calcsb( local_global )
_invocation_parameters['parallel'] = self.__parallel( local_global )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
_invocation_parameters['vis'] = self.__vis( _invocation_parameters ) if vis is None else vis
_invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun
_invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults
else:
# invoke with inp/go semantics
_invocation_parameters['vis'] = self.__vis( self.__globals_( ) )
_invocation_parameters['desired_angular_resolution'] = self.__desired_angular_resolution( self.__globals_( ) )
_invocation_parameters['calcsb'] = self.__calcsb( self.__globals_( ) )
_invocation_parameters['parallel'] = self.__parallel( self.__globals_( ) )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) )
_invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) )
_invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-26s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#hifas_imageprecheck( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _hifas_imageprecheck_t( _invocation_parameters['vis'],_invocation_parameters['desired_angular_resolution'],_invocation_parameters['calcsb'],_invocation_parameters['parallel'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('hifas_imageprecheck')
casalog.post("Exception Reported: Error in hifas_imageprecheck: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_
hifas_imageprecheck = _hifas_imageprecheck( )