Source code for pipeline.hif.tasks.refant.referenceantenna

import os

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.infrastructure import task_registry
from ...heuristics import findrefant

__all__ = [
    'RefAnt',
    'RefAntInputs',
    'RefAntResults',
    'HpcRefAnt',
    'HpcRefAntInputs',
]

LOG = infrastructure.get_logger(__name__)


[docs]class RefAntInputs(vdp.StandardInputs): @vdp.VisDependentProperty def field(self): # return each field in the current ms that has been observed # with the desired intent fields = self.ms.get_fields(intent=self.intent) unique_field_names = {f.name for f in fields} field_ids = {f.id for f in fields} # Fields with different intents may have the same name. Check for this # and return the IDs if necessary if len(unique_field_names) == len(field_ids): return ','.join(unique_field_names) else: return ','.join([str(i) for i in field_ids]) flagging = vdp.VisDependentProperty(default=True) geometry = vdp.VisDependentProperty(default=True) intent = vdp.VisDependentProperty(default='AMPLITUDE,BANDPASS,PHASE,POLARIZATION') refant = vdp.VisDependentProperty(default='') refantignore = vdp.VisDependentProperty(default='') spw = vdp.VisDependentProperty(default='') hm_refant = vdp.VisDependentProperty(default='automatic') @hm_refant.convert def hm_refant(self, value): return 'manual' if value == 'manual' else 'automatic' @vdp.VisDependentProperty def spw(self): intents = {'PHASE', 'BANDPASS', 'AMPLITUDE', 'POLARIZATION'} spws = [spw for spw in self.ms.get_spectral_windows() if not intents.isdisjoint(spw.intents)] return ','.join([str(spw.id) for spw in spws])
[docs] def to_casa_args(self): # refant does not use CASA tasks raise NotImplementedError
def __init__(self, context, vis=None, output_dir=None, field=None, spw=None, intent=None, hm_refant=None, refant=None, geometry=None, flagging=None, refantignore=None): self.context = context self.vis = vis self.output_dir = output_dir self.field = field self.spw = spw self.intent = intent self.hm_refant = hm_refant self.refant = refant self.geometry = geometry self.flagging = flagging self.refantignore = refantignore
[docs]class RefAntResults(basetask.Results): def __init__(self, vis, refant): super(RefAntResults, self).__init__() self._vis = vis self._refant = ','.join([str(ant) for ant in refant])
[docs] def merge_with_context(self, context): if self._vis is None or self._refant is None: LOG.error('No results to merge') return # Do we also need to locate the antenna in the measurement set? # This might become necessary when using different sessions ms = context.observing_run.get_ms(name=self._vis) if ms: LOG.debug('Setting refant for {!s} to {!r}'.format(ms.basename, self._refant)) ms.reference_antenna = self._refant
def __str__(self): if self._vis is None or self._refant is None: return ('Reference antenna results:\n' '\tNo reference antenna selected') else: return ('Reference antenna results:\n' '{!s}: refant={!r}'.format(os.path.basename(self._vis), self._refant)) def __repr__(self): return 'RefAntResults({!r}, {!r})'.format(self._vis, self._refant)
[docs]@task_registry.set_equivalent_casa_task('hif_refant') @task_registry.set_casa_commands_comment( 'Antennas are prioritized and enumerated based on fraction flagged and position in the array. The best antenna is ' 'used as a reference antenna unless it gets flagged, in which case the next-best antenna is used.\n' 'This stage performs a pipeline calculation without running any CASA commands to be put in this file.' ) class RefAnt(basetask.StandardTaskTemplate): Inputs = RefAntInputs def __init__(self, inputs): super(RefAnt, self).__init__(inputs)
[docs] def prepare(self, **parameters): inputs = self.inputs # Get the reference antenna list if inputs.hm_refant == 'manual': refant = inputs.refant.split(',') elif inputs.hm_refant == 'automatic': casa_intents = utils.to_CASA_intent(inputs.ms, inputs.intent) heuristics = findrefant.RefAntHeuristics(vis=inputs.vis, field=inputs.field, spw=inputs.spw, intent=casa_intents, geometry=inputs.geometry, flagging=inputs.flagging, refantignore=inputs.refantignore) refant = heuristics.calculate() else: raise NotImplemented('Unhandled hm_refant value: {!r}'.format(inputs.hm_refant)) return RefAntResults(inputs.vis, refant)
[docs] def analyse(self, results): return results
[docs]class HpcRefAntInputs(RefAntInputs): parallel = sessionutils.parallel_inputs_impl()
[docs] def to_casa_args(self): # Session tasks don't implement to_casa_args; it's the individual tasks # that do so raise NotImplementedError
def __init__(self, context, vis=None, output_dir=None, field=None, spw=None, intent=None, hm_refant=None, refant=None, geometry=None, flagging=None, refantignore=None, parallel=None): super(HpcRefAntInputs, self).__init__(context, vis=vis, output_dir=output_dir, field=field, spw=spw, intent=intent, hm_refant=hm_refant, refant=refant, geometry=geometry, flagging=flagging, refantignore=refantignore) self.parallel = parallel
[docs]@task_registry.set_equivalent_casa_task('hpc_hif_refant') class HpcRefAnt(sessionutils.ParallelTemplate): Inputs = HpcRefAntInputs Task = RefAnt def __init__(self, inputs): super(HpcRefAnt, self).__init__(inputs)
[docs] def get_result_for_exception(self, vis, result): failed_result = basetask.FailedTaskResults(self.__class__, result, result.message) return failed_result