Source code for pipeline.hifv.tasks.setmodel.vlasetjy

import csv
import math
import os

import numpy as np

import pipeline.domain as domain
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks.common import commonfluxresults
from pipeline.hifv.heuristics import find_EVLA_band
from pipeline.hifv.heuristics import standard as standard
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry

# Paper
# http://iopscience.iop.org/article/10.1088/0067-0049/204/2/19/pdf

LOG = infrastructure.get_logger(__name__)

ORIGIN = 'hifa_vlasetjy'


[docs]def find_standards(positions): """Function for finding standards from the original scripted EVLA pipeline """ # set the max separation as ~1' MAX_SEPARATION = 60*2.0e-5 position_3C48 = casa_tools.measures.direction('j2000', '1h37m41.299', '33d9m35.133') fields_3C48 = [] position_3C138 = casa_tools.measures.direction('j2000', '5h21m9.886', '16d38m22.051') fields_3C138 = [] position_3C147 = casa_tools.measures.direction('j2000', '5h42m36.138', '49d51m7.234') fields_3C147 = [] position_3C286 = casa_tools.measures.direction('j2000', '13h31m8.288', '30d30m32.959') fields_3C286 = [] for ii in range(0, len(positions)): position = casa_tools.measures.direction('j2000', str(positions[ii][0]) + 'rad', str(positions[ii][1]) + 'rad') separation = casa_tools.measures.separation(position, position_3C48)['value'] * math.pi / 180.0 if (separation < MAX_SEPARATION): fields_3C48.append(ii) else: separation = casa_tools.measures.separation(position, position_3C138)['value'] * math.pi / 180.0 if (separation < MAX_SEPARATION): fields_3C138.append(ii) else: separation = casa_tools.measures.separation(position, position_3C147)['value'] * math.pi / 180.0 if (separation < MAX_SEPARATION): fields_3C147.append(ii) else: separation = casa_tools.measures.separation(position, position_3C286)['value'] * math.pi / 180.0 if (separation < MAX_SEPARATION): fields_3C286.append(ii) fields = [fields_3C48, fields_3C138, fields_3C147, fields_3C286] return fields
[docs]def standard_sources(vis): """Get standard source names, fields, and positions - convenience function from the original EVLA scripted pipeline """ with casa_tools.TableReader(vis + '/FIELD') as table: field_positions = table.getcol('PHASE_DIR') positions = [] for ii in range(0, len(field_positions[0][0])): positions.append([field_positions[0][0][ii], field_positions[1][0][ii]]) standard_source_names = ['3C48', '3C138', '3C147', '3C286'] standard_source_fields = find_standards(positions) standard_source_found = False for standard_source_field in standard_source_fields: if standard_source_field: standard_source_found = True if not standard_source_found: standard_source_found = False LOG.error("ERROR: No standard flux density calibrator observed, flux density scale will be arbitrary") QA_calprep='Fail' return standard_source_names, standard_source_fields
[docs]class VLASetjyInputs(vdp.StandardInputs): @vdp.VisDependentProperty def field(self): # Get field ids in the current ms that have been observed # with the desired intent fields = self.ms.get_fields(intent=self.intent) unique_field_names = {f.name for f in fields} field_ids = {f.id for f in fields} # Fields with different intents may have the same name. Check for this # and return the field ids rather than the names to resolve any # ambiguities. if len(unique_field_names) is len(field_ids): return ','.join(unique_field_names) else: return ','.join([str(i) for i in field_ids]) intent = vdp.VisDependentProperty(default='AMPLITUDE') @vdp.VisDependentProperty def refspectra(self): # If flux density was explicitly set and is not equal to -1, which is # hard-coded as the default value in the task interface return the # default tuple which is composed of the reference frequency, the # Stokes fluxdensity and the spectral index if self.fluxdensity is not -1: return (self.reffreq, self.fluxdensity, self.spix) # There is no ms object. if not self.ms: return # The fluxdensity parameter is set to -1 which indicates that we must do a # flux lookup. The # lookup order is: # 1) from file, unless it's a solar system object # 2) from CASA # TODO: Replace with reading directly from the context # Read the reference flux values from a file ref_flux = [] if os.path.exists(self.reffile): with open(self.reffile, 'rt') as f: # First row is header row reader = csv.reader(f) next(reader) # Loop over rows trying the new CSV format first for row in reader: try: try: (ms_name, field_id, spw_id, I, Q, U, V, spix, uvmin, uvmax, comment) = row except: (ms_name, field_id, spw_id, I, Q, U, V, comment) = row spix = str(self.spix) except ValueError: LOG.warning('Invalid flux statement in %s: \'%s' '\'' % (self.reffile, row)) continue # Check that the entry is for the correct MS if os.path.basename(ms_name) != self.ms.basename: continue # Add the value spw_id = int(spw_id) ref_flux.append((field_id, spw_id, float(I), float(Q), float(U), float(V), float(spix), float(uvmin), float(uvmax))) # TODO sort the flux values in spw order? # Issue warning if the reference file was specified but not found. if not os.path.exists(self.reffile) and self.reffile not in ('', None): LOG.warning('Flux reference file not found: {!s}'.format(self.reffile)) # Get the spectral window ids for the spws specified by the inputs spws = self.ms.get_spectral_windows(self.spw) spw_ids = sorted(spw.id for spw in spws) # In order to print flux densities in the same order as the fields, we # must retrieve the flux density for each field in turn field_flux = [] for field_arg in utils.safe_split(self.field): # Field names may resolve to multiple field IDs fields = self.ms.get_fields(task_arg=field_arg, intent=self.intent) field_ids = {str(field.id) for field in fields} field_names = {field.name for field in fields} # Find fluxes flux_by_spw = [] for spw_id in spw_ids: reffreq = str(self.ms.get_spectral_window(spw_id).centre_frequency) if self.normfluxes: flux = [(reffreq, [I / I, Q / I, U / I, V / I], spix) for (ref_field_id, ref_spw_id, I, Q, U, V, spix, uvmin, uvmax) in ref_flux if (ref_field_id in field_ids or ref_field_id in field_names) and ref_spw_id == spw_id] else: flux = [(reffreq, [I, Q, U, V], spix) for (ref_field_id, ref_spw_id, I, Q, U, V, spix, uvmin, uvmax) in ref_flux if (ref_field_id in field_ids or ref_field_id in field_names) and ref_spw_id == spw_id] # No flux measurements found for the requested field/spws, so do # either a CASA model look-up (-1) or reset the flux to 1. if not flux: if any('AMPLITUDE' in f.intents for f in fields): flux = (reffreq, -1, self.spix) else: flux = (reffreq, [-1], self.spix) # If this is a solar system calibrator, ignore any catalogue # flux and request a CASA model lookup if not field_names.isdisjoint(standard.Standard.ephemeris_fields): LOG.debug('Ignoring records from file for solar system ' 'calibrator') flux = (reffreq, -1, 0.0) flux_by_spw.append(flux[0] if len(flux) is 1 else flux) field_flux.append(flux_by_spw[0] if len(flux_by_spw) is 1 else flux_by_spw) return field_flux[0] if len(field_flux) is 1 else field_flux @vdp.VisDependentProperty def reffile(self): value = os.path.join(self.context.output_dir, 'flux.csv') return value normfluxes = vdp.VisDependentProperty(default=False) reffreq = vdp.VisDependentProperty(default='1GHz') fluxdensity = vdp.VisDependentProperty(default=-1) spix = vdp.VisDependentProperty(default=0.0) scalebychan = vdp.VisDependentProperty(default=True) model = vdp.VisDependentProperty(default='') @vdp.VisDependentProperty def standard(self): # Get the standard heuristics function. heu_standard = standard.Standard() # The field may be an integer, but the standard heuristic operates on # strings so determine the corresponding name of the fields field_names = [] for field in utils.safe_split(self.field): if str(field).isdigit(): matching_fields = self.ms.get_fields(field) assert len(matching_fields) is 1 field_names.append(matching_fields[0].name) else: field_names.append(field) standards = [heu_standard(field) for field in field_names] return standards[0] if len(standards) is 1 else standards def __init__(self, context, output_dir=None, vis=None, field=None, intent=None, spw=None, model=None, scalebychan=None, fluxdensity=None, spix=None, reffreq=None, standard=None, refspectra=None, reffile=None, normfluxes=None): super(VLASetjyInputs, self).__init__() self.context = context self.vis = vis self.output_dir = output_dir self.field = field self.intent = intent self.spw = spw self.model = model self.scalebychan = scalebychan self.fluxdensity = fluxdensity self.spix = spix self.reffreq = reffreq self.standard = standard self.refspectra = refspectra self.reffile = reffile self.normfluxes = normfluxes
[docs] def to_casa_args(self): d = super(VLASetjyInputs, self).to_casa_args() d['fluxdensity'] = d['refspectra'][1] try: np.testing.assert_almost_equal(d['refspectra'][2], 0.0) except: d['reffreq'] = 'TOPO ' + d['refspectra'][0].replace(" ", "") d['spix'] = d['refspectra'][2] # Filter out reffile. Note that the , is required for ignore in ('reffile', 'refspectra', 'normfluxes',): if ignore in d: del d[ignore] # Enable intent selection in CASA. d['selectdata'] = True # Force usescratch to True for now d['usescratch'] = True return d
[docs]@task_registry.set_equivalent_casa_task('hifv_vlasetjy') class VLASetjy(basetask.StandardTaskTemplate): Inputs = VLASetjyInputs
[docs] def prepare(self): inputs = self.inputs result = commonfluxresults.FluxCalibrationResults(vis=inputs.vis) # Return early if the field has no data of the required intent. This # could be the case when given multiple MSes, one of which could be # without an amplitude calibrator for instance. if not inputs.ms.get_fields(inputs.field, intent=inputs.intent): LOG.warning('Field(s) \'%s\' in %s have no data with intent %s' % (inputs.field, inputs.ms.basename, inputs.intent)) return result # get the spectral windows for the spw inputs argument # This is done later in the loop for the VLA, just keeping here for history memory # print "Type of inputs.spw: ", inputs.spw, type(inputs.spw) # spws = [spw for spw in inputs.ms.get_spectral_windows(inputs.spw)] # Multiple spectral windows spawn multiple setjy jobs. Set a unique # marker in the CASA log so that we can identify log entries from this # particular task # start_marker = self._add_marker_to_casa_log() # loop over fields so that we can use Setjy for sources with different # standards setjy_dicts = [] standard_source_names, standard_source_fields = standard_sources(self.inputs.vis) m = self.inputs.context.observing_run.get_ms(self.inputs.vis) field_spws = m.get_vla_field_spws() spw2band = m.get_vla_spw2band() # bands = spw2band.values() # Look in spectral window domain object as this information already exists! with casa_tools.TableReader(self.inputs.vis + '/SPECTRAL_WINDOW') as table: channels = table.getcol('NUM_CHAN') originalBBClist = table.getcol('BBC_NO') spw_bandwidths = table.getcol('TOTAL_BANDWIDTH') reference_frequencies = table.getcol('REF_FREQUENCY') center_frequencies = [rf + spwbw / 2 for rf, spwbw in zip(reference_frequencies, spw_bandwidths)] # LOG.info("STANDARD SOURCE FIELDS:") # print standard_source_fields for i, fields in enumerate(standard_source_fields): for myfield in fields: domainfield = m.get_fields(myfield)[0] if 'AMPLITUDE' in domainfield.intents: inputs.field = myfield # Use the domain object to get the actual field id, ***NOT*** the index # setjyfield = m.get_fields()[myfield].id jobs = [] VLAspws = field_spws[myfield] strlistVLAspws = ','.join(str(spw) for spw in VLAspws) spws = [spw for spw in inputs.ms.get_spectral_windows(strlistVLAspws)] for spw in spws: inputs.spw = spw.id reference_frequency = center_frequencies[spw.id] try: EVLA_band = spw2band[spw.id] except: LOG.info('Unable to get band from spw id - using reference frequency instead') EVLA_band = find_EVLA_band(reference_frequency) LOG.info("Center freq for spw " + str(spw.id) + " = " + str(reference_frequency) + ", observing band = " + EVLA_band) model_image = standard_source_names[i] + '_' + EVLA_band + '.im' LOG.info("Setting model for field " + str(m.get_fields()[myfield].id) + " spw "+str(spw.id) + " using " + model_image) if self.inputs.model: LOG.warn("Model override input. Using {!s}".format(self.inputs.model)) model_image = self.inputs.model task_args = {'vis' : inputs.vis, 'field' : str(myfield), 'spw' : str(spw.id), 'selectdata' : False, 'model' : model_image, 'intent' : '', 'listmodels' : False, 'scalebychan' : inputs.scalebychan, 'fluxdensity' : inputs.fluxdensity, 'standard' : inputs.standard, 'usescratch' : True} jobs.append(casa_tasks.setjy(**task_args)) # Flux densities coming from a non-lookup are added to the # results so that user-provided calibrator fluxes are # committed back to the domain objects if inputs.refspectra[1] is not -1: try: (I, Q, U, V) = inputs.refspectra[1] flux = domain.FluxMeasurement(spw_id=spw.id, I=I, Q=Q, U=U, V=V, origin=ORIGIN) except: I = inputs.refspectra[1][0] flux = domain.FluxMeasurement(spw_id=spw.id, I=I, origin=ORIGIN) result.measurements[str(myfield)].append(flux) # merge identical jobs into one job with a multi-spw argument jobs_and_components = utils.merge_jobs(jobs, casa_tasks.setjy, merge=('spw',)) for job, _ in jobs_and_components: try: setjy_dicts.append(self._executor.execute(job)) except: LOG.warn("SetJy issue with field id=" + str(job.kw['field']) + " and spw=" + str(job.kw['spw'])) spw_seen = set() for setjy_dict in setjy_dicts: setjy_dict.pop('format') for field_id in setjy_dict: setjy_dict[field_id].pop('fieldName') field = self.inputs.ms.get_fields(field_id)[0] if inputs.refspectra[1] == -1: for spw_id in setjy_dict[field_id]: I = setjy_dict[field_id][spw_id]['fluxd'][0] Q = setjy_dict[field_id][spw_id]['fluxd'][1] U = setjy_dict[field_id][spw_id]['fluxd'][2] V = setjy_dict[field_id][spw_id]['fluxd'][3] flux = domain.FluxMeasurement(spw_id, I, Q=Q, U=U, V=V, origin=ORIGIN) if spw_id not in spw_seen: result.measurements[field.identifier].append(flux) spw_seen.add(spw_id) return result
[docs] def analyse(self, result): return result