Source code for pipeline.h.cli.gotasks.h_exportdata

##################### generated by xml-casa (v2) from h_exportdata.xml ##############
##################### b548baac21036617ab0fea3909c95993 ##############################
from __future__ import absolute_import
from casashell.private.stack_manip import find_local as __sf__
from casashell.private.stack_manip import find_frame as _find_frame
from casatools.typecheck import validator as _pc
from casatools.coercetype import coerce as _coerce
from pipeline.h.cli import h_exportdata as _h_exportdata_t
from collections import OrderedDict
import numpy
import sys
import os

import shutil

[docs]def static_var(varname, value): def decorate(func): setattr(func, varname, value) return func return decorate
class _h_exportdata: """ h_exportdata ---- Prepare interferometry data for export The hif_exportdata task exports the data defined in the pipeline context and exports it to the data products directory, converting and or packing it as necessary. The current version of the task exports the following products o an XML file containing the pipeline processing request o a tar file per ASDM / MS containing the final flags version o a text file per ASDM / MS containing the final calibration apply list o a FITS image for each selected calibrator source image o a FITS image for each selected science target source image o a tar file per session containing the caltables for that session o a tar file containing the file web log o a text file containing the final list of CASA commands Issues Support for merging the calibration state information into the pipeline context / results structure and retrieving it still needs to be added. Support for merging the clean results into the pipeline context / results structure and retrieving it still needs to be added. Support for creating the final pipeline results entity still needs to be added. Session information is not currently handled by the pipeline context. By default all ASDMs are combined into one session. Returns If pipeline mode is 'getinputs' then None is returned. Otherwise the results object for the pipeline task is returned. --------- parameter descriptions --------------------------------------------- vis List of visibility data files for which flagging and calibration information will be exported. Defaults to the list maintained in the pipeline context. Can only be set in pipelinemode='interactive' example: vis=['X227.ms', 'X228.ms'] session session -- List of sessions one per visibility file. Defaults to a single virtual session containing all the visibility files in vis. Can only be set in pipelinemode='interactive' example: session=['session1', 'session2'] imaging_products_only Export the science target image products only exportmses Export MeasurementSets defined in vis instead of flags, caltables, and calibration instructions. Can only be set in pipelinemode='interactive' example: exportmses = True pprfile Name of the pipeline processing request to be exported. Defaults to a file matching the template 'PPR_*.xml'. Can only be set in pipelinemode='interactive' example: pprfile=['PPR_GRB021004.xml'] calintents calintents -- List of calibrator image types to be exported. Defaults to all standard calibrator intents 'BANDPASS', 'PHASE', 'FLUX' Can only be set in pipelinemode='interactive' example: calintents='PHASE' calimages List of calibrator images to be exported. Defaults to all calibrator images recorded in the pipeline context. Can only be set in pipelinemode='interactive' example: calimages=['3C454.3.bandpass', '3C279.phase'] targetimages List of science target images to be exported. Science target images recorded in the pipeline context. Can only be set in pipelinemode='interactive' example: targetimages=['NGC3256.band3', 'NGC3256.band6'] products_dir Name of the data products subdirectory. Can only be set in pipelinemode='interactive' example: products_dir='../products' pipelinemode The pipeline operating mode. In 'automatic' mode the pipeline determines the values of all context defined pipeline inputs automatically. In 'interactive' mode the user can set the pipeline context defined parameters manually. In 'getinputs' mode the user can check the settings of all pipeline parameters without running the task. dryrun Run the task (False) or display task command (True) acceptresults Add the results into the pipeline context --------- examples ----------------------------------------------------------- Examples 1. Export the pipeline results for a single session to the data products directory !mkdir ../products hif_exportdata (products_dir='../products') 2. Export the pipeline results to the data products directory specify that only the gain calibrator images be saved. !mkdir ../products hif_exportdata (products_dir='../products', calintents='*PHASE*') """ _info_group_ = """pipeline""" _info_desc_ = """Prepare interferometry data for export""" __schema = {'vis': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'session': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'imaging_products_only': {'type': 'cBool'}, 'exportmses': {'type': 'cBool'}, 'pprfile': {'type': 'cStr', 'coerce': _coerce.to_str}, 'calintents': {'type': 'cStr', 'coerce': _coerce.to_str}, 'calimages': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'targetimages': {'type': 'cStrVec', 'coerce': [_coerce.to_list,_coerce.to_strvec]}, 'products_dir': {'type': 'cStr', 'coerce': _coerce.to_str}, 'pipelinemode': {'type': 'cStr', 'coerce': _coerce.to_str, 'allowed': [ 'automatic', 'interactive', 'getinputs' ]}, 'dryrun': {'type': 'cBool'}, 'acceptresults': {'type': 'cBool'}} def __init__(self): self.__stdout = None self.__stderr = None self.__root_frame_ = None def __globals_(self): if self.__root_frame_ is None: self.__root_frame_ = _find_frame( ) assert self.__root_frame_ is not None, "could not find CASAshell global frame" return self.__root_frame_ def __to_string_(self,value): if type(value) is str: return "'%s'" % value else: return str(value) def __validate_(self,doc,schema): return _pc.validate(doc,schema) def __do_inp_output(self,param_prefix,description_str,formatting_chars): out = self.__stdout or sys.stdout description = description_str.split( ) prefix_width = 23 + 21 + 4 output = [ ] addon = '' first_addon = True while len(description) > 0: ## starting a new line..................................................................... if len(output) == 0: ## for first line add parameter information............................................ if len(param_prefix)-formatting_chars > prefix_width - 1: output.append(param_prefix) continue addon = param_prefix + ' #' first_addon = True addon_formatting = formatting_chars else: ## for subsequent lines space over prefix width........................................ addon = (' ' * prefix_width) + '#' first_addon = False addon_formatting = 0 ## if first word of description puts us over the screen width, bail........................ if len(addon + description[0]) - addon_formatting + 1 > self.term_width: ## if we're doing the first line make sure it's output................................. if first_addon: output.append(addon) break while len(description) > 0: ## if the next description word puts us over break for the next line................... if len(addon + description[0]) - addon_formatting + 1 > self.term_width: break addon = addon + ' ' + description[0] description.pop(0) output.append(addon) out.write('\n'.join(output) + '\n') #--------- return nonsubparam values ---------------------------------------------- def __imaging_products_only_dflt( self, glb ): return False def __imaging_products_only( self, glb ): if 'imaging_products_only' in glb: return glb['imaging_products_only'] return False def __exportmses_dflt( self, glb ): return False def __exportmses( self, glb ): if 'exportmses' in glb: return glb['exportmses'] return False def __pipelinemode_dflt( self, glb ): return 'automatic' def __pipelinemode( self, glb ): if 'pipelinemode' in glb: return glb['pipelinemode'] return 'automatic' #--------- return inp/go default -------------------------------------------------- def __targetimages_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __dryrun_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(False) return None def __calimages_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __vis_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __acceptresults_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return bool(True) return None def __pprfile_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __session_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return [] if self.__pipelinemode( glb ) == "getinputs": return [] return None def __calintents_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None def __products_dir_dflt( self, glb ): if self.__pipelinemode( glb ) == "interactive": return "" if self.__pipelinemode( glb ) == "getinputs": return "" return None #--------- return subparam values ------------------------------------------------- def __vis( self, glb ): if 'vis' in glb: return glb['vis'] dflt = self.__vis_dflt( glb ) if dflt is not None: return dflt return [ ] def __session( self, glb ): if 'session' in glb: return glb['session'] dflt = self.__session_dflt( glb ) if dflt is not None: return dflt return [ ] def __pprfile( self, glb ): if 'pprfile' in glb: return glb['pprfile'] dflt = self.__pprfile_dflt( glb ) if dflt is not None: return dflt return '' def __calintents( self, glb ): if 'calintents' in glb: return glb['calintents'] dflt = self.__calintents_dflt( glb ) if dflt is not None: return dflt return '' def __calimages( self, glb ): if 'calimages' in glb: return glb['calimages'] dflt = self.__calimages_dflt( glb ) if dflt is not None: return dflt return [ ] def __targetimages( self, glb ): if 'targetimages' in glb: return glb['targetimages'] dflt = self.__targetimages_dflt( glb ) if dflt is not None: return dflt return [ ] def __products_dir( self, glb ): if 'products_dir' in glb: return glb['products_dir'] dflt = self.__products_dir_dflt( glb ) if dflt is not None: return dflt return '' def __dryrun( self, glb ): if 'dryrun' in glb: return glb['dryrun'] dflt = self.__dryrun_dflt( glb ) if dflt is not None: return dflt return False def __acceptresults( self, glb ): if 'acceptresults' in glb: return glb['acceptresults'] dflt = self.__acceptresults_dflt( glb ) if dflt is not None: return dflt return True #--------- subparam inp output ---------------------------------------------------- def __vis_inp(self): if self.__vis_dflt( self.__globals_( ) ) is not None: description = 'List of input visibility data' value = self.__vis( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'vis': value},{'vis': self.__schema['vis']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('vis',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __session_inp(self): if self.__session_dflt( self.__globals_( ) ) is not None: description = 'List of sessions one per visibility file' value = self.__session( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'session': value},{'session': self.__schema['session']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('session',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __imaging_products_only_inp(self): description = 'Export the science target image products only' value = self.__imaging_products_only( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'imaging_products_only': value},{'imaging_products_only': self.__schema['imaging_products_only']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-21.21s = %s%-23s%s' % ('imaging_products_only',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __exportmses_inp(self): description = 'Export MeasurementSets instead of flags and caltables' value = self.__exportmses( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'exportmses': value},{'exportmses': self.__schema['exportmses']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('%-21.21s = %s%-23s%s' % ('exportmses',pre,self.__to_string_(value),post),description,0+len(pre)+len(post)) def __pprfile_inp(self): if self.__pprfile_dflt( self.__globals_( ) ) is not None: description = 'The pipeline processing request file to be exported' value = self.__pprfile( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'pprfile': value},{'pprfile': self.__schema['pprfile']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('pprfile',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __calintents_inp(self): if self.__calintents_dflt( self.__globals_( ) ) is not None: description = 'The calibrator source target intents to be exported' value = self.__calintents( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'calintents': value},{'calintents': self.__schema['calintents']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('calintents',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __calimages_inp(self): if self.__calimages_dflt( self.__globals_( ) ) is not None: description = 'List of calibrator images to be exported' value = self.__calimages( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'calimages': value},{'calimages': self.__schema['calimages']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('calimages',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __targetimages_inp(self): if self.__targetimages_dflt( self.__globals_( ) ) is not None: description = 'List of target images to be exported' value = self.__targetimages( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'targetimages': value},{'targetimages': self.__schema['targetimages']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('targetimages',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __products_dir_inp(self): if self.__products_dir_dflt( self.__globals_( ) ) is not None: description = 'The data products directory' value = self.__products_dir( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'products_dir': value},{'products_dir': self.__schema['products_dir']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('products_dir',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __pipelinemode_inp(self): description = 'The pipeline operating mode' value = self.__pipelinemode( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'pipelinemode': value},{'pipelinemode': self.__schema['pipelinemode']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output('\x1B[1m\x1B[47m%-21.21s =\x1B[0m %s%-23s%s' % ('pipelinemode',pre,self.__to_string_(value),post),description,13+len(pre)+len(post)) def __dryrun_inp(self): if self.__dryrun_dflt( self.__globals_( ) ) is not None: description = 'Run the task (False) or display task command (True)' value = self.__dryrun( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'dryrun': value},{'dryrun': self.__schema['dryrun']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('dryrun',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) def __acceptresults_inp(self): if self.__acceptresults_dflt( self.__globals_( ) ) is not None: description = 'Add the results into the pipeline context' value = self.__acceptresults( self.__globals_( ) ) (pre,post) = ('','') if self.__validate_({'acceptresults': value},{'acceptresults': self.__schema['acceptresults']}) else ('\x1B[91m','\x1B[0m') self.__do_inp_output(' \x1B[92m%-18.18s =\x1B[0m %s%-23s%s' % ('acceptresults',pre,self.__to_string_(value),post),description,9+len(pre)+len(post)) #--------- global default implementation------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def set_global_defaults(self): self.set_global_defaults.state['last'] = self glb = self.__globals_( ) if 'targetimages' in glb: del glb['targetimages'] if 'dryrun' in glb: del glb['dryrun'] if 'calimages' in glb: del glb['calimages'] if 'pipelinemode' in glb: del glb['pipelinemode'] if 'vis' in glb: del glb['vis'] if 'acceptresults' in glb: del glb['acceptresults'] if 'pprfile' in glb: del glb['pprfile'] if 'imaging_products_only' in glb: del glb['imaging_products_only'] if 'session' in glb: del glb['session'] if 'calintents' in glb: del glb['calintents'] if 'exportmses' in glb: del glb['exportmses'] if 'products_dir' in glb: del glb['products_dir'] #--------- inp function ----------------------------------------------------------- def inp(self): print("# h_exportdata -- %s" % self._info_desc_) self.term_width, self.term_height = shutil.get_terminal_size(fallback=(80, 24)) self.__vis_inp( ) self.__session_inp( ) self.__imaging_products_only_inp( ) self.__exportmses_inp( ) self.__pprfile_inp( ) self.__calintents_inp( ) self.__calimages_inp( ) self.__targetimages_inp( ) self.__products_dir_inp( ) self.__pipelinemode_inp( ) self.__dryrun_inp( ) self.__acceptresults_inp( ) #--------- tget function ---------------------------------------------------------- @static_var('state', __sf__('casa_inp_go_state')) def tget(self,file=None): from casashell.private.stack_manip import find_frame from runpy import run_path filename = None if file is None: if os.path.isfile("h_exportdata.last"): filename = "h_exportdata.last" elif isinstance(file, str): if os.path.isfile(file): filename = file if filename is not None: glob = find_frame( ) newglob = run_path( filename, init_globals={ } ) for i in newglob: glob[i] = newglob[i] self.tget.state['last'] = self else: print("could not find last file, setting defaults instead...") self.set_global_defaults( ) def __call__( self, vis=None, session=None, imaging_products_only=None, exportmses=None, pprfile=None, calintents=None, calimages=None, targetimages=None, products_dir=None, pipelinemode=None, dryrun=None, acceptresults=None ): def noobj(s): if s.startswith('<') and s.endswith('>'): return "None" else: return s _prefile = os.path.realpath('h_exportdata.pre') _postfile = os.path.realpath('h_exportdata.last') _return_result_ = None _arguments = [vis,session,imaging_products_only,exportmses,pprfile,calintents,calimages,targetimages,products_dir,pipelinemode,dryrun,acceptresults] _invocation_parameters = OrderedDict( ) if any(map(lambda x: x is not None,_arguments)): # invoke python style # set the non sub-parameters that are not None local_global = { } if imaging_products_only is not None: local_global['imaging_products_only'] = imaging_products_only if exportmses is not None: local_global['exportmses'] = exportmses if pipelinemode is not None: local_global['pipelinemode'] = pipelinemode # the invocation parameters for the non-subparameters can now be set - this picks up those defaults _invocation_parameters['imaging_products_only'] = self.__imaging_products_only( local_global ) _invocation_parameters['exportmses'] = self.__exportmses( local_global ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( local_global ) # the sub-parameters can then be set. Use the supplied value if not None, else the function, which gets the appropriate default _invocation_parameters['vis'] = self.__vis( _invocation_parameters ) if vis is None else vis _invocation_parameters['session'] = self.__session( _invocation_parameters ) if session is None else session _invocation_parameters['pprfile'] = self.__pprfile( _invocation_parameters ) if pprfile is None else pprfile _invocation_parameters['calintents'] = self.__calintents( _invocation_parameters ) if calintents is None else calintents _invocation_parameters['calimages'] = self.__calimages( _invocation_parameters ) if calimages is None else calimages _invocation_parameters['targetimages'] = self.__targetimages( _invocation_parameters ) if targetimages is None else targetimages _invocation_parameters['products_dir'] = self.__products_dir( _invocation_parameters ) if products_dir is None else products_dir _invocation_parameters['dryrun'] = self.__dryrun( _invocation_parameters ) if dryrun is None else dryrun _invocation_parameters['acceptresults'] = self.__acceptresults( _invocation_parameters ) if acceptresults is None else acceptresults else: # invoke with inp/go semantics _invocation_parameters['vis'] = self.__vis( self.__globals_( ) ) _invocation_parameters['session'] = self.__session( self.__globals_( ) ) _invocation_parameters['imaging_products_only'] = self.__imaging_products_only( self.__globals_( ) ) _invocation_parameters['exportmses'] = self.__exportmses( self.__globals_( ) ) _invocation_parameters['pprfile'] = self.__pprfile( self.__globals_( ) ) _invocation_parameters['calintents'] = self.__calintents( self.__globals_( ) ) _invocation_parameters['calimages'] = self.__calimages( self.__globals_( ) ) _invocation_parameters['targetimages'] = self.__targetimages( self.__globals_( ) ) _invocation_parameters['products_dir'] = self.__products_dir( self.__globals_( ) ) _invocation_parameters['pipelinemode'] = self.__pipelinemode( self.__globals_( ) ) _invocation_parameters['dryrun'] = self.__dryrun( self.__globals_( ) ) _invocation_parameters['acceptresults'] = self.__acceptresults( self.__globals_( ) ) try: with open(_prefile,'w') as _f: for _i in _invocation_parameters: _f.write("%-21s = %s\n" % (_i,noobj(repr(_invocation_parameters[_i])))) _f.write("#h_exportdata( ") count = 0 for _i in _invocation_parameters: _f.write("%s=%s" % (_i,noobj(repr(_invocation_parameters[_i])))) count += 1 if count < len(_invocation_parameters): _f.write(",") _f.write(" )\n") except: pass try: _return_result_ = _h_exportdata_t( _invocation_parameters['vis'],_invocation_parameters['session'],_invocation_parameters['imaging_products_only'],_invocation_parameters['exportmses'],_invocation_parameters['pprfile'],_invocation_parameters['calintents'],_invocation_parameters['calimages'],_invocation_parameters['targetimages'],_invocation_parameters['products_dir'],_invocation_parameters['pipelinemode'],_invocation_parameters['dryrun'],_invocation_parameters['acceptresults'] ) except Exception as e: from traceback import format_exc from casatasks import casalog casalog.origin('h_exportdata') casalog.post("Exception Reported: Error in h_exportdata: %s" % str(e),'SEVERE') casalog.post(format_exc( )) _return_result_ = False try: os.rename(_prefile,_postfile) except: pass return _return_result_ h_exportdata = _h_exportdata( )