##################### generated by xml-casa (v2) from hsd_skycal.xml ################
##################### a80c48afbe05f7a1313eec333fe390d4 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.hsd.cli import hsd_skycal as _hsd_skycal_t
from collections import OrderedDict
import numpy
import sys
import os
import shutil
[docs]def static_var(varname, value):
def decorate(func):
setattr(func, varname, value)
return func
return decorate
class _hsd_skycal:
"""
hsd_skycal ---- Calibrate data
The hsd_skycal generates a caltable for sky calibration that stores
reference spectra, which is to be subtracted from on-source spectra to filter
out non-source contribution.
Output:
results -- If pipeline mode is 'getinputs' then None is returned. Otherwise
the results object for the pipeline task is returned.
--------- parameter descriptions ---------------------------------------------
calmode Calibration mode. Available options are 'auto' (default),
'ps', 'otf', and 'otfraster'. When 'auto' is set, the task will
use preset calibration mode that is determined by inspecting data.
'ps' mode is simple position switching using explicit reference
scans. Other two modes, 'otf' and 'otfraster', will generate
reference data from scans at the edge of the map. Those modes
are intended for OTF observation and the former is defined for
generic scanning pattern such as Lissajous, while the latter is
specific use for raster scan.
options: 'auto', 'ps', 'otf', 'otfraster'
fraction Sub-parameter for calmode. Edge marking parameter for
'otf' and 'otfraster' mode. It specifies a number of OFF scans
as a fraction of total number of data points.
options: String style like '20%', or float value less than 1.0.
For 'otfraster' mode, you can also specify 'auto'.
noff Sub-parameter for calmode. Edge marking parameter for 'otfraster'
mode. It is used to specify a number of OFF scans near edge directly
instead to specify it by fractional number by 'fraction'. If it is
set, the value will come before setting by 'fraction'.
options: any positive integer value
width Sub-parameter for calmode. Edge marking parameter for 'otf'
mode. It specifies pixel width with respect to a median spatial
separation between neighboring two data in time. Default will
be fine in most cases.
options: any float value
elongated Sub-parameter for calmode. Edge marking parameter for
'otf' mode. Please set True only if observed area is elongated
in one direction.
pipelinemode The pipeline operating mode. In 'automatic' mode the
pipeline determines the values of all context defined pipeline inputs
automatically. In 'interactive' mode the user can set the pipeline
context defined parameters manually. In 'getinputs' mode the user
can check the settings of all pipeline parameters without running
the task.
infiles List of data files. These must be a name of MeasurementSets that
are registered to context via hsd_importdata task.
example: vis=['X227.ms', 'X228.ms']
field Data selection by field name.
spw Data selection by spw. (defalut all spws)
example: '3,4' (generate caltable for spw 3 and 4)
['0','2'] (spw 0 for first data, 2 for second)
scan Data selection by scan number. (default all scans)
example: '22,23' (use scan 22 and 23 for calibration)
['22','24'] (scan 22 for first data, 24 for second)
dryrun Run the commands (True) or generate the commands to be run but
do not execute (False).
acceptresults Add the results of the task to the pipeline context (True) or
reject them (False).
--------- examples -----------------------------------------------------------
1. Generate caltables for all data managed by context.
default(hsd_skycal)
hsd_skycal()
"""
_info_group_ = """pipeline"""
_info_desc_ = """Calibrate data"""
__schema = {'calmode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'auto', 'ps', 'otf', 'otfraster' ]}, 'fraction': {'type': 'cVariant', 'coerce': [_coerce.to_variant]}, 'noff': {'type': 'cInt'}, 'width': {'type': 'cFloat', 'coerce': _coerce.to_float}, 'elongated': {'type': 'cBool'}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'infiles': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'field': {'type': 'cStr', 'coerce': _coerce.to_str}, 'spw': {'anyof': [{'type': 'cStr', 'coerce': _coerce.to_str}, {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}]}, 'scan': {'anyof': [{'type': 'cStr', 'coerce': _coerce.to_str}, {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}}
def __init__(self):
self.__stdout = None
self.__stderr = None
self.__root_frame_ = None
def __globals_(self):
if self.__root_frame_ is None:
self.__root_frame_ = _find_frame( )
assert self.__root_frame_ is not None, "could not find CASAshell global frame"
return self.__root_frame_
def __to_string_(self,value):
if type(value) is str:
return "'%s'" % value
else:
return str(value)
def __validate_(self,doc,schema):
return _pc.validate(doc,schema)
def __do_inp_output(self,param_prefix,description_str,formatting_chars):
out = self.__stdout or sys.stdout
description = description_str.split( )
prefix_width = 23 + 16 + 4
output = [ ]
addon = ''
first_addon = True
while len(description) > 0:
## starting a new line.....................................................................
if len(output) == 0:
## for first line add parameter information............................................
if len(param_prefix)-formatting_chars > prefix_width - 1:
output.append(param_prefix)
continue
addon = param_prefix + ' #'
first_addon = True
addon_formatting = formatting_chars
else:
## for subsequent lines space over prefix width........................................
addon = (' ' * prefix_width) + '#'
first_addon = False
addon_formatting = 0
## if first word of description puts us over the screen width, bail........................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width:
## if we're doing the first line make sure it's output.................................
if first_addon: output.append(addon)
break
while len(description) > 0:
## if the next description word puts us over break for the next line...................
if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break
addon = addon + ' ' + description[0]
description.pop(0)
output.append(addon)
out.write('\n'.join(output) + '\n')
#--------- return nonsubparam values ----------------------------------------------
def __calmode_dflt( self, glb ):
return 'auto'
def __calmode( self, glb ):
if 'calmode' in glb: return glb['calmode']
return 'auto'
def __pipelinemode_dflt( self, glb ):
return 'automatic'
def __pipelinemode( self, glb ):
if 'pipelinemode' in glb: return glb['pipelinemode']
return 'automatic'
#--------- return inp/go default --------------------------------------------------
def __dryrun_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(False)
return None
def __fraction_dflt( self, glb ):
if self.__calmode( glb ) == "otf": return "10%"
if self.__calmode( glb ) == "otfraster": return "10%"
return None
def __elongated_dflt( self, glb ):
if self.__calmode( glb ) == "otf": return bool(False)
return None
def __field_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return ""
if self.__pipelinemode( glb ) == "getinputs": return ""
return None
def __scan_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return ""
if self.__pipelinemode( glb ) == "getinputs": return ""
return None
def __acceptresults_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return bool(True)
return None
def __noff_dflt( self, glb ):
if self.__calmode( glb ) == "otfraster": return int(-1)
return None
def __width_dflt( self, glb ):
if self.__calmode( glb ) == "otf": return float(0.5)
return None
def __spw_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return ""
if self.__pipelinemode( glb ) == "getinputs": return ""
return None
def __infiles_dflt( self, glb ):
if self.__pipelinemode( glb ) == "interactive": return []
if self.__pipelinemode( glb ) == "getinputs": return []
return None
#--------- return subparam values -------------------------------------------------
def __fraction( self, glb ):
if 'fraction' in glb: return glb['fraction']
dflt = self.__fraction_dflt( glb )
if dflt is not None: return dflt
return '10%'
def __noff( self, glb ):
if 'noff' in glb: return glb['noff']
dflt = self.__noff_dflt( glb )
if dflt is not None: return dflt
return int(-1)
def __width( self, glb ):
if 'width' in glb: return glb['width']
dflt = self.__width_dflt( glb )
if dflt is not None: return dflt
return float(0.5)
def __elongated( self, glb ):
if 'elongated' in glb: return glb['elongated']
dflt = self.__elongated_dflt( glb )
if dflt is not None: return dflt
return False
def __infiles( self, glb ):
if 'infiles' in glb: return glb['infiles']
dflt = self.__infiles_dflt( glb )
if dflt is not None: return dflt
return [ ]
def __field( self, glb ):
if 'field' in glb: return glb['field']
dflt = self.__field_dflt( glb )
if dflt is not None: return dflt
return ''
def __spw( self, glb ):
if 'spw' in glb: return glb['spw']
dflt = self.__spw_dflt( glb )
if dflt is not None: return dflt
return ''
def __scan( self, glb ):
if 'scan' in glb: return glb['scan']
dflt = self.__scan_dflt( glb )
if dflt is not None: return dflt
return ''
def __dryrun( self, glb ):
if 'dryrun' in glb: return glb['dryrun']
dflt = self.__dryrun_dflt( glb )
if dflt is not None: return dflt
return False
def __acceptresults( self, glb ):
if 'acceptresults' in glb: return glb['acceptresults']
dflt = self.__acceptresults_dflt( glb )
if dflt is not None: return dflt
return True
#--------- subparam inp output ----------------------------------------------------
def __calmode_inp(self):
description = 'Calibration mode (default auto)'
value = self.__calmode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'calmode': value},{'calmode': self.__schema['calmode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('calmode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post))
def __fraction_inp(self):
if self.__fraction_dflt( self.__globals_( ) ) is not None:
description = 'fraction of the OFF data to mark'
value = self.__fraction( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'fraction': value},{'fraction': self.__schema['fraction']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('fraction',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __noff_inp(self):
if self.__noff_dflt( self.__globals_( ) ) is not None:
description = 'number of the OFF data to mark'
value = self.__noff( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'noff': value},{'noff': self.__schema['noff']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('noff',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __width_inp(self):
if self.__width_dflt( self.__globals_( ) ) is not None:
description = 'width of the pixel for edge detection'
value = self.__width( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'width': value},{'width': self.__schema['width']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('width',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __elongated_inp(self):
if self.__elongated_dflt( self.__globals_( ) ) is not None:
description = 'whether observed area is elongated in one direction or not'
value = self.__elongated( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'elongated': value},{'elongated': self.__schema['elongated']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('elongated',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __pipelinemode_inp(self):
description = 'The pipeline operating mode'
value = self.__pipelinemode( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output('\x1B[1m\x1B[47m%-16.16s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post))
def __infiles_inp(self):
if self.__infiles_dflt( self.__globals_( ) ) is not None:
description = 'List of input files to be calibrated (default all)'
value = self.__infiles( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'infiles': value},{'infiles': self.__schema['infiles']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('infiles',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __field_inp(self):
if self.__field_dflt( self.__globals_( ) ) is not None:
description = 'Field to be calibrated (default all)'
value = self.__field( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'field': value},{'field': self.__schema['field']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('field',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __spw_inp(self):
if self.__spw_dflt( self.__globals_( ) ) is not None:
description = 'select data by spw ids, e.g. \'3,5,7\' (\'\'=all)'
value = self.__spw( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'spw': value},{'spw': self.__schema['spw']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('spw',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __scan_inp(self):
if self.__scan_dflt( self.__globals_( ) ) is not None:
description = 'select data by scan numbers, e.g. \'21~23\' (\'\'=all)'
value = self.__scan( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'scan': value},{'scan': self.__schema['scan']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('scan',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __dryrun_inp(self):
if self.__dryrun_dflt( self.__globals_( ) ) is not None:
description = 'Run the task (False) or display task command (True)'
value = self.__dryrun( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
def __acceptresults_inp(self):
if self.__acceptresults_dflt( self.__globals_( ) ) is not None:
description = 'Add the results into the pipeline context'
value = self.__acceptresults( self.__globals_( ) )
(pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m')
self.__do_inp_output(' \x1B[92m%-13.13s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post))
#--------- global default implementation-------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def set_global_defaults(self):
self.set_global_defaults.state['last'] = self
glb = self.__globals_( )
if 'dryrun' in glb: del glb['dryrun']
if 'fraction' in glb: del glb['fraction']
if 'elongated' in glb: del glb['elongated']
if 'field' in glb: del glb['field']
if 'pipelinemode' in glb: del glb['pipelinemode']
if 'scan' in glb: del glb['scan']
if 'acceptresults' in glb: del glb['acceptresults']
if 'noff' in glb: del glb['noff']
if 'calmode' in glb: del glb['calmode']
if 'width' in glb: del glb['width']
if 'spw' in glb: del glb['spw']
if 'infiles' in glb: del glb['infiles']
#--------- inp function -----------------------------------------------------------
def inp(self):
print("# hsd_skycal -- %s" % self._info_desc_)
self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24))
self.__calmode_inp( )
self.__fraction_inp( )
self.__noff_inp( )
self.__width_inp( )
self.__elongated_inp( )
self.__pipelinemode_inp( )
self.__infiles_inp( )
self.__field_inp( )
self.__spw_inp( )
self.__scan_inp( )
self.__dryrun_inp( )
self.__acceptresults_inp( )
#--------- tget function ----------------------------------------------------------
@static_var('state', __sf__('casa_inp_go_state'))
def tget(self,file=None):
from casashell.private.stack_manip import find_frame
from runpy import run_path
filename = None
if file is None:
if os.path.isfile("hsd_skycal.last"):
filename = "hsd_skycal.last"
elif isinstance(file, str):
if os.path.isfile(file):
filename = file
if filename is not None:
glob = find_frame( )
newglob = run_path( filename, init_globals={ } )
for i in newglob:
glob[i] = newglob[i]
self.tget.state['last'] = self
else:
print("could not find last file, setting defaults instead...")
self.set_global_defaults( )
def __call__( self, calmode=None, fraction=None, noff=None, width=None, elongated=None, pipelinemode=None, infiles=None, field=None, spw=None, scan=None, dryrun=None, acceptresults=None ):
def noobj(s):
if s.startswith('<') and s.endswith('>'):
return "None"
else:
return s
_prefile = os.path.realpath('hsd_skycal.pre')
_postfile = os.path.realpath('hsd_skycal.last')
_return_result_ = None
_arguments = [calmode,fraction,noff,width,elongated,pipelinemode,infiles,field,spw,scan,dryrun,acceptresults]
_invocation_parameters = OrderedDict( )
if any(map(lambda x: x is not None,_arguments)):
# invoke python style
# set the non sub-parameters that are not None
local_global = { }
if calmode is not None: local_global['calmode'] = calmode
if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode
# the invocation parameters for the non-subparameters can now be set - this picks up those defaults
_invocation_parameters['calmode'] = self.__calmode( local_global )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global )
# the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default
_invocation_parameters['fraction'] = self.__fraction( _invocation_parameters ) if fraction is None else fraction
_invocation_parameters['noff'] = self.__noff( _invocation_parameters ) if noff is None else noff
_invocation_parameters['width'] = self.__width( _invocation_parameters ) if width is None else width
_invocation_parameters['elongated'] = self.__elongated( _invocation_parameters ) if elongated is None else elongated
_invocation_parameters['infiles'] = self.__infiles( _invocation_parameters ) if infiles is None else infiles
_invocation_parameters['field'] = self.__field( _invocation_parameters ) if field is None else field
_invocation_parameters['spw'] = self.__spw( _invocation_parameters ) if spw is None else spw
_invocation_parameters['scan'] = self.__scan( _invocation_parameters ) if scan is None else scan
_invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun
_invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults
else:
# invoke with inp/go semantics
_invocation_parameters['calmode'] = self.__calmode( self.__globals_( ) )
_invocation_parameters['fraction'] = self.__fraction( self.__globals_( ) )
_invocation_parameters['noff'] = self.__noff( self.__globals_( ) )
_invocation_parameters['width'] = self.__width( self.__globals_( ) )
_invocation_parameters['elongated'] = self.__elongated( self.__globals_( ) )
_invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) )
_invocation_parameters['infiles'] = self.__infiles( self.__globals_( ) )
_invocation_parameters['field'] = self.__field( self.__globals_( ) )
_invocation_parameters['spw'] = self.__spw( self.__globals_( ) )
_invocation_parameters['scan'] = self.__scan( self.__globals_( ) )
_invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) )
_invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) )
try:
with open(_prefile,'w') as _f:
for _i in _invocation_parameters:
_f.write("%-13s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i]))))
_f.write("#hsd_skycal( ")
count = 0
for _i in _invocation_parameters:
_f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i]))))
count += 1
if count < len(_invocation_parameters): _f.write(",")
_f.write(" )\n")
except: pass
try:
_return_result_ = _hsd_skycal_t( _invocation_parameters['calmode'],_invocation_parameters['fraction'],_invocation_parameters['noff'],_invocation_parameters['width'],_invocation_parameters['elongated'],_invocation_parameters['pipelinemode'],_invocation_parameters['infiles'],_invocation_parameters['field'],_invocation_parameters['spw'],_invocation_parameters['scan'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] )
except Exception as e:
from traceback import format_exc
from casatasks import casalog
casalog.origin('hsd_skycal')
casalog.post("Exception Reported: Error in hsd_skycal: %s" % str(e),'SEVERE')
casalog.post(format_exc( ))
_return_result_ = False
try:
os.rename(_prefile,_postfile)
except: pass
return _return_result_
hsd_skycal = _hsd_skycal( )