Source code for pipeline.hifa.tasks.imageprecheck.imageprecheck

import copy

import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.api as api
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.common.sensitivity import Sensitivity
from pipeline.hifa.heuristics import imageprecheck
from pipeline.hif.heuristics import imageparams_factory
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry

LOG = infrastructure.get_logger(__name__)


[docs]class ImagePreCheckResults(basetask.Results): def __init__(self, real_repr_target=False, repr_target='', repr_source='', repr_spw=None, reprBW_mode=None, reprBW_nbin=None, minAcceptableAngResolution='0.0arcsec', maxAcceptableAngResolution='0.0arcsec', maxAllowedBeamAxialRatio=0.0, sensitivityGoal='0mJy', hm_robust=0.5, hm_uvtaper=[], sensitivities=None, sensitivity_bandwidth=None, score=None, single_continuum=False, per_spw_cont_sensitivities_all_chan=None, synthesized_beams=None, beamRatios=None, error=False, error_msg=None): super(ImagePreCheckResults, self).__init__() if sensitivities is None: sensitivities = [] self.real_repr_target = real_repr_target self.repr_target = repr_target self.repr_source = repr_source self.repr_spw = repr_spw self.reprBW_mode = reprBW_mode self.reprBW_nbin = reprBW_nbin self.minAcceptableAngResolution = minAcceptableAngResolution self.maxAcceptableAngResolution = maxAcceptableAngResolution self.maxAllowedBeamAxialRatio = maxAllowedBeamAxialRatio self.sensitivityGoal = sensitivityGoal self.hm_robust = hm_robust self.hm_uvtaper = hm_uvtaper self.sensitivities = sensitivities self.sensitivities_for_aqua = [] self.sensitivity_bandwidth = sensitivity_bandwidth self.score = score self.single_continuum = single_continuum self.per_spw_cont_sensitivities_all_chan = per_spw_cont_sensitivities_all_chan self.synthesized_beams = synthesized_beams self.beamRatios = beamRatios self.error = error self.error_msg = error_msg
[docs] def merge_with_context(self, context): """ See :method:`~pipeline.infrastructure.api.Results.merge_with_context` """ # Store imaging parameters in context # Calculated sensitivities for later stages if self.per_spw_cont_sensitivities_all_chan is not None: if 'recalc' in self.per_spw_cont_sensitivities_all_chan: context.per_spw_cont_sensitivities_all_chan = copy.deepcopy(self.per_spw_cont_sensitivities_all_chan) del context.per_spw_cont_sensitivities_all_chan['recalc'] else: utils.update_sens_dict(context.per_spw_cont_sensitivities_all_chan, self.per_spw_cont_sensitivities_all_chan) # Calculated beams for later stages if self.synthesized_beams is not None: if 'recalc' in self.synthesized_beams: context.synthesized_beams = copy.deepcopy(self.synthesized_beams) del context.synthesized_beams['recalc'] else: utils.update_beams_dict(context.synthesized_beams, self.synthesized_beams) # Calculated robust and uvtaper values for later stages # # Note: For Cycle 6 the robust heuristic is used in subsequent stages. # The uvtaper heuristic is not yet to be used. context.imaging_parameters['robust'] = self.hm_robust #context.imaging_parameters['uvtaper'] = self.hm_uvtaper # It was decided not use a file based transport for the time being (03/2018) # Write imageparams.dat file #imageparams_filehandler = imageparamsfilehandler.ImageParamsFileHandler() #imageparams_filehandler.write(self.hm_robust, self.hm_uvtaper) # Add sensitivities to be reported to AQUA self.sensitivities_for_aqua.extend([s for s in self.sensitivities if s['robust']==self.hm_robust and s['uvtaper']==self.hm_uvtaper])
def __repr__(self): return 'ImagePreCheckResults:\n\t{0}'.format( '\n\t'.join(['robust=%.2f' % (self.hm_robust), 'uvtaper=%s' % (self.hm_uvtaper)]))
[docs]class ImagePreCheckInputs(vdp.StandardInputs): calcsb = vdp.VisDependentProperty(default=False) parallel = vdp.VisDependentProperty(default='automatic') def __init__(self, context, vis=None, calcsb=None, parallel=None): self.context = context self.vis = vis self.calcsb = calcsb self.parallel = parallel
# tell the infrastructure to give us mstransformed data when possible by # registering our preference for imaging measurement sets api.ImagingMeasurementSetsPreferred.register(ImagePreCheckInputs)
[docs]@task_registry.set_equivalent_casa_task('hifa_imageprecheck') class ImagePreCheck(basetask.StandardTaskTemplate): Inputs = ImagePreCheckInputs is_multi_vis_task = True
[docs] def prepare(self): inputs = self.inputs context = self.inputs.context cqa = casa_tools.quanta calcsb = inputs.calcsb parallel = inputs.parallel known_per_spw_cont_sensitivities_all_chan = context.per_spw_cont_sensitivities_all_chan known_synthesized_beams = context.synthesized_beams imageprecheck_heuristics = imageprecheck.ImagePreCheckHeuristics(inputs) image_heuristics_factory = imageparams_factory.ImageParamsHeuristicsFactory() image_heuristics = image_heuristics_factory.getHeuristics( vislist=inputs.vis, spw='', observing_run=context.observing_run, imagename_prefix=context.project_structure.ousstatus_entity_id, proj_params=context.project_performance_parameters, contfile=context.contfile, linesfile=context.linesfile, imaging_params=context.imaging_parameters, imaging_mode='ALMA' ) repr_target, repr_source, repr_spw, repr_freq, reprBW_mode, real_repr_target, minAcceptableAngResolution, maxAcceptableAngResolution, maxAllowedBeamAxialRatio, sensitivityGoal = image_heuristics.representative_target() repr_field = list(image_heuristics.field_intent_list('TARGET', repr_source))[0][0] repr_ms = self.inputs.ms[0] real_repr_spw = context.observing_run.virtual2real_spw_id(int(repr_spw), repr_ms) real_repr_spw_obj = repr_ms.get_spectral_window(real_repr_spw) single_continuum = any(['Single_Continuum' in t for t in real_repr_spw_obj.transitions]) # Get the array diameter = min([a.diameter for a in repr_ms.antennas]) if diameter == 7.0: array = '7m' robust_values_to_check = [0.5] else: array = '12m' robust_values_to_check = [0.0, 0.5, 1.0, 2.0] # Approximate reprBW with nbin if reprBW_mode in ['nbin', 'repr_spw']: physicalBW_of_1chan = float(real_repr_spw_obj.channels[0].getWidth().convert_to(measures.FrequencyUnits.HERTZ).value) nbin = int(cqa.getvalue(cqa.convert(repr_target[2], 'Hz'))/physicalBW_of_1chan + 0.5) cont_sens_bw_modes = ['aggBW'] scale_aggBW_to_repBW = False elif reprBW_mode == 'multi_spw': nbin = -1 cont_sens_bw_modes = ['repBW', 'aggBW'] scale_aggBW_to_repBW = True else: nbin = -1 cont_sens_bw_modes = ['repBW', 'aggBW'] scale_aggBW_to_repBW = False primary_beam_size = image_heuristics.largest_primary_beam_size(spwspec=str(repr_spw), intent='TARGET') gridder = image_heuristics.gridder('TARGET', repr_field) field_ids = image_heuristics.field('TARGET', repr_field) cont_spwids = sorted(context.observing_run.virtual_science_spw_ids) repr_field_obj = repr_ms.get_fields(repr_field, intent='TARGET')[0] filtered_cont_spwids = sorted( [context.observing_run.real2virtual_spw_id(s.id, repr_ms) for s in repr_field_obj.valid_spws if context.observing_run.real2virtual_spw_id(s.id, repr_ms) in list(map(int, cont_spwids))]) cont_spw = ','.join(map(str, filtered_cont_spwids)) num_cont_spw = len(filtered_cont_spwids) # Get default heuristics uvtaper value default_uvtaper = image_heuristics.uvtaper() beams = {(0.0, str(default_uvtaper), 'repBW'): None, \ (0.5, str(default_uvtaper), 'repBW'): None, \ (1.0, str(default_uvtaper), 'repBW'): None, \ (2.0, str(default_uvtaper), 'repBW'): None, \ (0.0, str(default_uvtaper), 'aggBW'): None, \ (0.5, str(default_uvtaper), 'aggBW'): None, \ (1.0, str(default_uvtaper), 'aggBW'): None, \ (2.0, str(default_uvtaper), 'aggBW'): None} cells = {} imsizes = {} sensitivities = [] sensitivity_bandwidth = None for robust in robust_values_to_check: # Calculate nbin / reprBW sensitivity if necessary if reprBW_mode in ['nbin', 'repr_spw']: beams[(robust, str(default_uvtaper), 'repBW')], known_synthesized_beams = image_heuristics.synthesized_beam( [(repr_field, 'TARGET')], str(repr_spw), robust=robust, uvtaper=default_uvtaper, known_beams=known_synthesized_beams, force_calc=calcsb, parallel=parallel, shift=True) # If the beam is invalid, error and return. if beams[(robust, str(default_uvtaper), 'repBW')] == 'invalid': LOG.error('Beam for repBW and robust value of %.1f is invalid. Cannot continue.' % robust) return ImagePreCheckResults(error=True, error_msg='Invalid beam') cells[(robust, str(default_uvtaper), 'repBW')] = image_heuristics.cell(beams[(robust, str(default_uvtaper), 'repBW')]) imsizes[(robust, str(default_uvtaper), 'repBW')] = image_heuristics.imsize(field_ids, cells[(robust, str(default_uvtaper), 'repBW')], primary_beam_size, centreonly=False) try: sensitivity, eff_ch_bw, sens_bw, known_per_spw_cont_sensitivities_all_chan = \ image_heuristics.calc_sensitivities(inputs.vis, repr_field, 'TARGET', str(repr_spw), nbin, {}, 'cube', gridder, cells[(robust, str(default_uvtaper), 'repBW')], imsizes[(robust, str(default_uvtaper), 'repBW')], 'briggs', robust, default_uvtaper, True, known_per_spw_cont_sensitivities_all_chan, calcsb) # Set calcsb flag to False since the first calculations of beam # and sensitivity will have already reset the dictionaries. calcsb = False sensitivities.append(Sensitivity( array=array, field=repr_field, spw=str(repr_spw), bandwidth=cqa.quantity(sens_bw, 'Hz'), bwmode='repBW', beam=beams[(robust, str(default_uvtaper), 'repBW')], cell=[cqa.convert(cells[(robust, str(default_uvtaper), 'repBW')][0], 'arcsec'), cqa.convert(cells[(robust, str(default_uvtaper), 'repBW')][0], 'arcsec')], robust=robust, uvtaper=default_uvtaper, sensitivity=cqa.quantity(sensitivity, 'Jy/beam'))) except: sensitivities.append(Sensitivity( array=array, field=repr_field, spw=str(repr_spw), bandwidth=cqa.quantity(0.0, 'Hz'), bwmode='repBW', beam=beams[(robust, str(default_uvtaper), 'repBW')], cell=['0.0 arcsec', '0.0 arcsec'], robust=robust, uvtaper=default_uvtaper, sensitivity=cqa.quantity(0.0, 'Jy/beam'))) sens_bw = 0.0 sensitivity_bandwidth = cqa.quantity(sens_bw, 'Hz') beams[(robust, str(default_uvtaper), 'aggBW')], known_synthesized_beams = image_heuristics.synthesized_beam( [(repr_field, 'TARGET')], cont_spw, robust=robust, uvtaper=default_uvtaper, known_beams=known_synthesized_beams, force_calc=calcsb, parallel=parallel, shift=True) # If the beam is invalid, error and return. if beams[(robust, str(default_uvtaper), 'aggBW')] == 'invalid': LOG.error('Beam for aggBW and robust value of %.1f is invalid. Cannot continue.' % robust) return ImagePreCheckResults(error=True, error_msg='Invalid beam') cells[(robust, str(default_uvtaper), 'aggBW')] = image_heuristics.cell(beams[(robust, str(default_uvtaper), 'aggBW')]) imsizes[(robust, str(default_uvtaper), 'aggBW')] = image_heuristics.imsize(field_ids, cells[(robust, str(default_uvtaper), 'aggBW')], primary_beam_size, centreonly=False) # Calculate full cont sensitivity (no frequency ranges excluded) try: sensitivity, eff_ch_bw, sens_bw, known_per_spw_cont_sensitivities_all_chan = \ image_heuristics.calc_sensitivities(inputs.vis, repr_field, 'TARGET', cont_spw, -1, {}, 'cont', gridder, cells[(robust, str(default_uvtaper), 'aggBW')], imsizes[(robust, str(default_uvtaper), 'aggBW')], 'briggs', robust, default_uvtaper, True, known_per_spw_cont_sensitivities_all_chan, calcsb) # Set calcsb flag to False since the first calculations of beam # and sensitivity will have already reset the dictionaries. calcsb = False for cont_sens_bw_mode in cont_sens_bw_modes: if scale_aggBW_to_repBW and cont_sens_bw_mode == 'repBW': # Handle scaling to repSPW_BW < repBW <= 0.9 * aggBW case _bandwidth = repr_target[2] _sensitivity = cqa.mul(cqa.quantity(sensitivity, 'Jy/beam'), cqa.sqrt(cqa.div(cqa.quantity(sens_bw, 'Hz'), repr_target[2]))) else: _bandwidth = cqa.quantity(min(sens_bw, num_cont_spw * 1.875e9), 'Hz') _sensitivity = cqa.quantity(sensitivity, 'Jy/beam') sensitivities.append(Sensitivity( array=array, field=repr_field, spw=cont_spw, bandwidth=_bandwidth, bwmode=cont_sens_bw_mode, beam=beams[(robust, str(default_uvtaper), 'aggBW')], cell=[cqa.convert(cells[(robust, str(default_uvtaper), 'aggBW')][0], 'arcsec'), cqa.convert(cells[(robust, str(default_uvtaper), 'aggBW')][0], 'arcsec')], robust=robust, uvtaper=default_uvtaper, sensitivity=_sensitivity)) except Exception as e: for _ in cont_sens_bw_modes: sensitivities.append(Sensitivity( array=array, field=repr_field, spw=cont_spw, bandwidth=cqa.quantity(0.0, 'Hz'), bwmode='aggBW', beam=beams[(robust, str(default_uvtaper), 'aggBW')], cell=['0.0 arcsec', '0.0 arcsec'], robust=robust, uvtaper=default_uvtaper, sensitivity=cqa.quantity(0.0, 'Jy/beam'))) sens_bw = 0.0 if sensitivity_bandwidth is None: sensitivity_bandwidth = cqa.quantity(_bandwidth, 'Hz') # Apply robust heuristic based on beam sizes for the used robust values. if reprBW_mode in ['nbin', 'repr_spw']: hm_robust, hm_robust_score, beamRatio_0p0, beamRatio_0p5, beamRatio_1p0, beamRatio_2p0 = \ imageprecheck_heuristics.compare_beams( \ beams[(0.0, str(default_uvtaper), 'repBW')], \ beams[(0.5, str(default_uvtaper), 'repBW')], \ beams[(1.0, str(default_uvtaper), 'repBW')], \ beams[(2.0, str(default_uvtaper), 'repBW')], \ minAcceptableAngResolution, \ maxAcceptableAngResolution, \ maxAllowedBeamAxialRatio) else: hm_robust, hm_robust_score, beamRatio_0p0, beamRatio_0p5, beamRatio_1p0, beamRatio_2p0 = \ imageprecheck_heuristics.compare_beams( \ beams[(0.0, str(default_uvtaper), 'aggBW')], \ beams[(0.5, str(default_uvtaper), 'aggBW')], \ beams[(1.0, str(default_uvtaper), 'aggBW')], \ beams[(2.0, str(default_uvtaper), 'aggBW')], \ minAcceptableAngResolution, \ maxAcceptableAngResolution, \ maxAllowedBeamAxialRatio) # Save beam ratios for weblog beamRatios = { \ (0.0, str(default_uvtaper)): beamRatio_0p0, (0.5, str(default_uvtaper)): beamRatio_0p5, (1.0, str(default_uvtaper)): beamRatio_1p0, (2.0, str(default_uvtaper)): beamRatio_2p0 } if real_repr_target: # Determine heuristic UV taper value # # For ALMA Cycle 6 the additional beam, cell and sensitivity values for a different # uvtaper are not to be calculated, shown or used. if False and hm_robust == 2.0: if reprBW_mode in ['nbin', 'repr_spw']: hm_uvtaper = image_heuristics.uvtaper(beam_natural=beams[(2.0, str(default_uvtaper), 'repBW')], protect_long=None) else: hm_uvtaper = image_heuristics.uvtaper(beam_natural=beams[(2.0, str(default_uvtaper), 'aggBW')], protect_long=None) if hm_uvtaper != []: # Add sensitivity entries with actual tapering beams[(hm_robust, str(hm_uvtaper), 'repBW')], known_synthesized_beams = image_heuristics.synthesized_beam( [(repr_field, 'TARGET')], str(repr_spw), robust=hm_robust, uvtaper=hm_uvtaper, known_beams=known_synthesized_beams, force_calc=calcsb, parallel=parallel, shift=True) # If the beam is invalid, error and return. if beams[(hm_robust, str(hm_uvtaper), 'repBW')] == 'invalid': LOG.error('Beam for uvtaper repBW is invalid. Cannot continue.') return ImagePreCheckResults(error=True, error_msg='Invalid beam') cells[(hm_robust, str(hm_uvtaper), 'repBW')] = image_heuristics.cell(beams[(hm_robust, str(hm_uvtaper), 'repBW')]) imsizes[(hm_robust, str(hm_uvtaper), 'repBW')] = image_heuristics.imsize(field_ids, cells[(hm_robust, str(hm_uvtaper), 'repBW')], primary_beam_size, centreonly=False) if reprBW_mode in ['nbin', 'repr_spw']: try: sensitivity, eff_ch_bw, sens_bw, known_per_spw_cont_sensitivities_all_chan = \ image_heuristics.calc_sensitivities(inputs.vis, repr_field, 'TARGET', str(repr_spw), nbin, {}, 'cube', gridder, cells[(hm_robust, str(hm_uvtaper), 'repBW')], imsizes[(hm_robust, str(hm_uvtaper), 'repBW')], 'briggs', hm_robust, hm_uvtaper, True, known_per_spw_cont_sensitivities_all_chan, calcsb) sensitivities.append(Sensitivity( array=array, field=repr_field, spw=str(repr_spw), bandwidth=cqa.quantity(sens_bw, 'Hz'), bwmode='repBW', beam=beams[(hm_robust, str(hm_uvtaper), 'repBW')], cell=[cqa.convert(cells[(hm_robust, str(hm_uvtaper), 'repBW')][0], 'arcsec'), cqa.convert(cells[(hm_robust, str(hm_uvtaper), 'repBW')][0], 'arcsec')], robust=hm_robust, uvtaper=hm_uvtaper, sensitivity=cqa.quantity(sensitivity, 'Jy/beam'))) except Exception as e: sensitivities.append(Sensitivity( array=array, field=repr_field, spw=str(repr_spw), bandwidth=cqa.quantity(0.0, 'Hz'), bwmode='repBW', beam=beams[(hm_robust, str(hm_uvtaper), 'repBW')], cell=['0.0 arcsec', '0.0 arcsec'], robust=robust, uvtaper=hm_uvtaper, sensitivity=cqa.quantity(0.0, 'Jy/beam'))) beams[(hm_robust, str(hm_uvtaper), 'aggBW')], known_synthesized_beams = image_heuristics.synthesized_beam( [(repr_field, 'TARGET')], cont_spw, robust=hm_robust, uvtaper=hm_uvtaper, known_beams=known_synthesized_beams, force_calc=calcsb, parallel=parallel, shift=True) # If the beam is invalid, error and return. if beams[(hm_robust, str(hm_uvtaper), 'aggBW')] == 'invalid': LOG.error('Beam for uvtaper aggBW is invalid. Cannot continue.') return ImagePreCheckResults(error=True, error_msg='Invalid beam') cells[(hm_robust, str(hm_uvtaper), 'aggBW')] = image_heuristics.cell(beams[(hm_robust, str(hm_uvtaper), 'aggBW')]) imsizes[(hm_robust, str(hm_uvtaper), 'aggBW')] = image_heuristics.imsize(field_ids, cells[(hm_robust, str(hm_uvtaper), 'aggBW')], primary_beam_size, centreonly=False) try: sensitivity, eff_ch_bw, sens_bw, known_per_spw_cont_sensitivities_all_chan = \ image_heuristics.calc_sensitivities(inputs.vis, repr_field, 'TARGET', cont_spw, -1, {}, 'cont', gridder, cells[(hm_robust, str(hm_uvtaper), 'aggBW')], imsizes[(hm_robust, str(hm_uvtaper), 'aggBW')], 'briggs', hm_robust, hm_uvtaper, True, known_per_spw_cont_sensitivities_all_chan, calcsb) if scale_aggBW_to_repBW and cont_sens_bw_mode == 'repBW': # Handle scaling to repSPW_BW < repBW <= 0.9 * aggBW case _bandwidth = repr_target[2] _sensitivity = cqa.mul(cqa.quantity(sensitivity, 'Jy/beam'), cqa.sqrt(cqa.div(cqa.quantity(sens_bw, 'Hz'), repr_target[2]))) else: _bandwidth = cqa.quantity(min(sens_bw, num_cont_spw * 1.875e9), 'Hz') _sensitivity = cqa.quantity(sensitivity, 'Jy/beam') for cont_sens_bw_mode in cont_sens_bw_modes: sensitivities.append(Sensitivity( array=array, field=repr_field, spw=str(repr_spw), bandwidth=_bandwidth, bwmode=cont_sens_bw_mode, beam=beams[(hm_robust, str(hm_uvtaper), 'aggBW')], cell=[cqa.convert(cells[(hm_robust, str(hm_uvtaper), 'aggBW')][0], 'arcsec'), cqa.convert(cells[(hm_robust, str(hm_uvtaper), 'aggBW')][0], 'arcsec')], robust=hm_robust, uvtaper=hm_uvtaper, sensitivity=_sensitivity)) except: for _ in cont_sens_bw_modes: sensitivities.append(Sensitivity( array=array, field=repr_field, spw=str(repr_spw), bandwidth=cqa.quantity(0.0, 'Hz'), bwmode='repBW', beam=beams[(hm_robust, str(hm_uvtaper), 'aggBW')], cell=['0.0 arcsec', '0.0 arcsec'], robust=robust, uvtaper=hm_uvtaper, sensitivity=cqa.quantity(0.0, 'Jy/beam'))) else: hm_uvtaper = default_uvtaper else: hm_robust = 0.5 hm_uvtaper = default_uvtaper minAcceptableAngResolution = cqa.quantity(0.0, 'arcsec') maxAcceptableAngResolution = cqa.quantity(0.0, 'arcsec') hm_uvtaper = default_uvtaper return ImagePreCheckResults( real_repr_target, repr_target, repr_source, repr_spw, reprBW_mode, nbin, minAcceptableAngResolution=minAcceptableAngResolution, maxAcceptableAngResolution=maxAcceptableAngResolution, maxAllowedBeamAxialRatio=maxAllowedBeamAxialRatio, sensitivityGoal=sensitivityGoal, hm_robust=hm_robust, hm_uvtaper=hm_uvtaper, sensitivities=sensitivities, sensitivity_bandwidth=sensitivity_bandwidth, score=hm_robust_score, single_continuum=single_continuum, per_spw_cont_sensitivities_all_chan=known_per_spw_cont_sensitivities_all_chan, synthesized_beams=known_synthesized_beams, beamRatios=beamRatios )
[docs] def analyse(self, results): return results