Source code for pipeline.hif.tasks.setmodel.setmodel
import copy
import os
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.h.heuristics import fieldnames as fieldnames
from pipeline.h.tasks.common import commonfluxresults
from pipeline.infrastructure import task_registry
from . import setjy
LOG = infrastructure.get_logger(__name__)
[docs]class SetModelsInputs(vdp.StandardInputs):
normfluxes = vdp.VisDependentProperty(default=True)
refintent = vdp.VisDependentProperty(default = 'AMPLITUDE')
scalebycan = vdp.VisDependentProperty(default=True)
transintent = vdp.VisDependentProperty(default = 'BANDPASS')
@vdp.VisDependentProperty
def reference(self):
# this will give something like '0542+3243,0343+242'
field_fn = fieldnames.IntentFieldnames()
reference_fields = field_fn.calculate(self.ms, self.refintent)
# run the answer through a set, just in case there are duplicates
fields = {s for s in utils.safe_split(reference_fields)}
return ','.join(fields)
@vdp.VisDependentProperty
def reffile(self):
value = os.path.join(self.context.output_dir, 'flux.csv')
return value
@vdp.VisDependentProperty
def transfer(self):
transfer_fn = fieldnames.IntentFieldnames()
# call the heuristic to get the transfer fields as a string
transfer_fields = transfer_fn.calculate(self.ms, self.transintent)
# remove the reference field should it also have been observed with
# the transfer intent
transfers = set(self.ms.get_fields(task_arg=transfer_fields))
references = set(self.ms.get_fields(task_arg=self.reference))
diff = transfers.difference(references)
transfer_names = {f.name for f in diff}
fields_with_name = self.ms.get_fields(name=transfer_names)
if len(fields_with_name) is not len(diff) or len(diff) is not len(transfer_names):
return ','.join([str(f.id) for f in diff])
else:
return ','.join(transfer_names)
def __init__(self, context, output_dir=None, vis=None, reference=None,
refintent=None, transfer=None, transintent=None,
reffile=None, normfluxes=None, scalebychan=None):
super(SetModelsInputs, self).__init__()
self.context = context
self.vis = vis
self.output_dir = output_dir
self.reference = reference
self.refintent = refintent
self.transfer = transfer
self.transintent = transintent
self.reffile = reffile
self.normfluxes = normfluxes
self.scalebychan = scalebychan
[docs]@task_registry.set_equivalent_casa_task('hif_setmodels')
class SetModels(basetask.StandardTaskTemplate):
Inputs = SetModelsInputs
[docs] def prepare(self, **parameters):
# Initialize the result.
result = commonfluxresults.FluxCalibrationResults(vis=self.inputs.vis)
# Set reference calibrator models.
# These models will be assigned the lookup reference frequency,
# Stokes parameters, and spix if they are available. If they are not the
# Setjy defaults (spw center frequency, [1.0, 0.0, 0.0, 0.0], 0.0)
# will be used
reference_fields = self.inputs.reference
reference_intents = self.inputs.refintent
if reference_fields not in (None, ''):
refresults = self._do_setjy(
reference_fields, reference_intents, reffile=self.inputs.reffile,
normfluxes=False, scalebychan=self.inputs.scalebychan)
# Add measurements to the results object
result.measurements.update(copy.deepcopy(refresults.measurements))
# Set transfer calibrator models.
# These models will be assigned the lookup reference frequency,
# Stokes parameters, and spix if they are available. If they are not the
# Setjy defaults (spw center frequency, [1.0, 0.0, 0.0, 0.0], 0.0)
# will be used . If normfluxes is True then the stokes parameters
# will be normalized to a value of 1
transfer_fields = self.inputs.transfer
transfer_intents = self.inputs.transintent
if transfer_fields not in (None, ''):
transresults = self._do_setjy(
transfer_fields, transfer_intents, reffile=self.inputs.reffile,
normfluxes=self.inputs.normfluxes, scalebychan=self.inputs.scalebychan)
# Add measurements to the results object
result.measurements.update(copy.deepcopy(transresults.measurements))
return result
# Call the Setjy task
def _do_setjy(self, field, intent, reffile=None, normfluxes=None, scalebychan=None):
task_args = {
'output_dir': self.inputs.output_dir,
'vis': self.inputs.vis,
'field': field,
'intent': intent,
'fluxdensity': -1,
'reffile': reffile,
'normfluxes': normfluxes,
'scalebychan': scalebychan
}
task_inputs = vdp.InputsContainer(setjy.Setjy, self.inputs.context, **task_args)
task = setjy.Setjy(task_inputs)
results_list = self._executor.execute(task, merge=False)
return results_list[0]