import collections
import operator
import os
import uuid
from functools import reduce
import numpy as np
import scipy.stats as stats
import pipeline.domain as domain
import pipeline.domain.measures as measures
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.sessionutils as sessionutils
import pipeline.infrastructure.utils as utils
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import FluxMeasurement
from pipeline.h.tasks.common import commonfluxresults
from pipeline.h.tasks.common import commonhelpermethods
from pipeline.hif.tasks import applycal
from pipeline.hif.tasks import gaincal
from pipeline.hif.tasks.fluxscale import fluxscale
from pipeline.hif.tasks.setmodel import setjy
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import exceptions
from pipeline.infrastructure import task_registry
from . import fluxes
from ... import heuristics
__all__ = [
'GcorFluxscale',
'GcorFluxscaleInputs',
'GcorFluxscaleResults',
'SessionGcorFluxscale',
'SessionGcorFluxscaleInputs'
]
LOG = infrastructure.get_logger(__name__)
ORIGIN = 'gcorfluxscale'
[docs]class GcorFluxscaleResults(commonfluxresults.FluxCalibrationResults):
def __init__(self, vis, resantenna=None, uvrange=None, measurements=None, fluxscale_measurements=None,
applies_adopted=False):
super(GcorFluxscaleResults, self).__init__(vis, resantenna=resantenna, uvrange=uvrange,
measurements=measurements)
self.applies_adopted = applies_adopted
# To store the fluxscale derived flux measurements:
if fluxscale_measurements is None:
fluxscale_measurements = collections.defaultdict(list)
self.fluxscale_measurements = fluxscale_measurements
[docs] def merge_with_context(self, context):
# Update the measurement set with the calibrated visibility based flux
# measurements for later use in imaging (PIPE-644, PIPE-660).
ms = context.observing_run.get_ms(self.vis)
ms.derived_fluxes = self.measurements
[docs]@task_registry.set_equivalent_casa_task('hifa_gfluxscale')
@task_registry.set_casa_commands_comment(
'The absolute flux calibration is transferred to secondary calibrator sources.'
)
class GcorFluxscale(basetask.StandardTaskTemplate):
Inputs = GcorFluxscaleInputs
def __init__(self, inputs):
super(GcorFluxscale, self).__init__(inputs)
[docs] def prepare(self, **parameters):
inputs = self.inputs
# Initialize results.
result = GcorFluxscaleResults(inputs.vis, resantenna='', uvrange='')
# Check that the measurement set does have an amplitude calibrator.
if inputs.reference == '':
# No point carrying on if not.
LOG.error('%s has no data with reference intent %s' % (inputs.ms.basename, inputs.refintent))
return result
# Run setjy for sources in the reference list which have transfer intents.
if inputs.ms.get_fields(inputs.reference, intent=inputs.transintent):
self._do_setjy(reffile=inputs.reffile, field=inputs.reference)
else:
LOG.info('Flux calibrator field(s) {!r} in {!s} have no data with '
'intent {!s}'.format(inputs.reference, inputs.ms.basename, inputs.transintent))
# set the reference antenna
refant = inputs.refant
if refant == '':
# Get the reference antenna for this measurement set from the
# context. This comes back as a string containing a ranked
# list of antenna names.
refant = inputs.ms.reference_antenna
# If no reference antenna was found in the context for this measurement
# (refant equals either None or an empty string), then raise an exception.
if not (refant and refant.strip()):
msg = ('No reference antenna specified and none found in context for %s' % inputs.ms.basename)
LOG.error(msg)
raise exceptions.PipelineException(msg)
LOG.trace('refant: %s' % refant)
# Get the reference spwmap for flux scaling
refspwmap = inputs.refspwmap
if not refspwmap:
refspwmap = inputs.ms.reference_spwmap
if not refspwmap:
refspwmap = [-1]
# Determine the spw gaincal strategy. This determines how the phase only tables will
# be constructed.
if inputs.ms.combine_spwmap:
spwidlist = [spw.id for spw in inputs.ms.get_spectral_windows(science_windows_only=True)]
fieldnamelist = [field.name for field in inputs.ms.get_fields(task_arg=inputs.transfer, intent='PHASE')]
exptimes = heuristics.exptimes.get_scan_exptimes(inputs.ms, fieldnamelist, 'PHASE', spwidlist)
phaseup_solint = '%0.3fs' % (min([exptime[1] for exptime in exptimes]) / 4.0)
phase_gaintype = 'T'
phase_combine = 'spw'
phaseup_spwmap = inputs.ms.combine_spwmap
phase_interp = 'linearPD,linear'
else:
phaseup_solint = inputs.phaseupsolint
phase_gaintype = 'G'
phase_combine = ''
phaseup_spwmap = inputs.ms.phaseup_spwmap
phase_interp = None
# Resolved source heuristics.
# Needs improvement if users start specifying the input antennas.
# For the time being force minblperant to be 2 instead of None to
# avoid ACA and Tsys flagging issues.
allantenna = inputs.antenna
minblperant = 2
if inputs.hm_resolvedcals == 'automatic':
# Get the antennas to be used in the gaincals, limiting
# the range if the reference calibrator is resolved.
refant0 = refant.split(',')[0] # use the first refant
resantenna, uvrange = heuristics.fluxscale.antenna(ms=inputs.ms, refsource=inputs.reference, refant=refant0,
peak_frac=inputs.peak_fraction)
# Do nothing if the source is unresolved.
# If the source is resolved but the number of
# antennas equals the total number of antennas
# use all the antennas but pass along the uvrange
# limit.
if resantenna == '' and uvrange == '':
pass
else:
nant = len(inputs.ms.antennas)
nresant = len(resantenna.split(','))
if nresant >= nant:
resantenna = allantenna
else:
resantenna = allantenna
uvrange = inputs.uvrange
result.resantenna = resantenna
result.uvrange = uvrange
# Do a phase-only gaincal on the flux calibrator using a
# restricted set of antennas
if resantenna == '':
filtered_refant = refant
else: # filter refant if resolved calibrator or antenna selection
resant_list = resantenna.rstrip('&').split(',')
filtered_refant = str(',').join([ant for ant in refant.split(',') if ant in resant_list])
r = self._do_gaincal(field=inputs.reference, intent=inputs.refintent, gaintype=phase_gaintype, calmode='p',
combine=phase_combine, solint=inputs.phaseupsolint, antenna=resantenna, uvrange=uvrange,
refant=filtered_refant, minblperant=minblperant, phaseup_spwmap=None, phase_interp=None,
append=False, merge=False)
# Test for the existence of the caltable
try:
caltable = r.final.pop().gaintable
except:
caltable = r.error.pop().gaintable
LOG.warn('Cannot compute phase solution table %s for the flux calibrator' % (os.path.basename(caltable)))
# Do a phase-only gaincal on the remaining calibrators using the full
# set of antennas; append this to the phase solutions caltable produced
# for the flux calibrator; merge result into local task context so that
# the caltable is included in pre-apply for subsequent gaincal.
append = os.path.exists(caltable)
r = self._do_gaincal(caltable=caltable, field=inputs.transfer, intent=inputs.transintent,
gaintype=phase_gaintype, calmode='p', combine=phase_combine, solint=phaseup_solint,
antenna=allantenna, uvrange='', minblperant=None, refant=filtered_refant,
phaseup_spwmap=phaseup_spwmap, phase_interp=phase_interp, append=append, merge=True)
# Now do the amplitude-only gaincal. This will produce the caltable
# that fluxscale will analyse
try:
ampcal_result = self._do_gaincal(
field=inputs.transfer + ',' + inputs.reference,
intent=inputs.transintent + ',' + inputs.refintent, gaintype='T', calmode='a',
combine='', solint=inputs.solint, antenna=allantenna, uvrange='',
refant=filtered_refant, minblperant=minblperant, phaseup_spwmap=None,
phase_interp=None, append=False, merge=True)
# Get the gaincal caltable from the results
try:
caltable = ampcal_result.final.pop().gaintable
except:
caltable = ' %s' % ampcal_result.error.pop().gaintable if ampcal_result.error else ''
LOG.warn('Cannot compute compute the flux scaling table%s' % (os.path.basename(caltable)))
# To make the following fluxscale reliable the caltable
# should contain gains for the the same set of antennas for
# each of the amplitude and phase calibrators - looking
# at each spw separately.
if os.path.exists(caltable):
check_ok = self._check_caltable(caltable=caltable, ms=inputs.ms, reference=inputs.reference,
transfer=inputs.transfer)
else:
check_ok = False
except:
caltable = ' %s' % ampcal_result.error.pop().gaintable if ampcal_result.error else ''
LOG.warn('Cannot compute phase solution table%s for the phase '
'and bandpass calibrator' % (os.path.basename(caltable)))
check_ok = False
# If no valid amplitude caltable is available to analyse, then log an
# error and return early.
if not check_ok:
LOG.error('Unable to complete flux scaling operation for MS %s' % (os.path.basename(inputs.vis)))
return result
# Otherwise, continue with derivation of flux densities.
# PIPE-644: derive both fluxscale-based scaling factors, as well as
# calibrated visibility fluxes.
try:
# Derive fluxscale-based flux measurements, and store in the result
# for reporting in weblog.
fluxscale_result = self._derive_fluxscale_flux(caltable, refspwmap)
result.fluxscale_measurements.update(fluxscale_result.measurements)
# Computing calibrated visibility fluxes will require a temporary
# applycal, which is performed as part of "_derive_calvis_flux()"
# below. To prepare for this temporary applycal, first update the
# callibrary in the local context to replace the amplitude caltable
# produced earlier (which used a default flux density of 1.0 Jy)
# with the caltable produced by fluxscale, which contains
# amplitudes set according to the derived flux values.
self._replace_amplitude_caltable(ampcal_result, fluxscale_result)
# Derive calibrated visibility based flux measurements
# and store in result for reporting in weblog and merging into
# context (into measurement set).
calvis_fluxes = self._derive_calvis_flux()
result.measurements.update(calvis_fluxes.measurements)
except Exception as e:
# Something has gone wrong, return an empty result
LOG.error('Unable to complete flux scaling operation for MS {}'.format(inputs.ms.basename))
LOG.exception('Flux scaling error', exc_info=e)
return result
[docs] def analyse(self, result):
return result
@staticmethod
def _check_caltable(caltable, ms, reference, transfer):
"""
Check that the give caltable is well-formed so that a 'fluxscale'
will run successfully on it:
1. Check that the caltable contains results for the reference and
transfer fields.
"""
# get the ids of the reference source and phase source(s)
ref_fieldid = {field.id for field in ms.fields if field.name == reference}
transfer_fieldids = {field.id for field in ms.fields if field.name in transfer}
with casa_tools.TableReader(caltable) as table:
fieldids = table.getcol('FIELD_ID')
# warn if field IDs does not contains the amplitude and phase calibrators
fieldids = set(fieldids)
if fieldids.isdisjoint(ref_fieldid):
LOG.warning('%s contains ambiguous reference calibrator field names' % os.path.basename(caltable))
if not fieldids.issuperset(transfer_fieldids):
LOG.warning('%s does not contain results for all transfer calibrators' % os.path.basename(caltable))
return True
def _compute_calvis_flux(self, fieldid, spwid):
# Read in data from the MS, for specified field, spw, and all transfer
# intents.
data = self._read_data_from_ms(self.inputs.ms, fieldid, spwid, self.inputs.transintent)
# Return if no valid data were read.
if not data:
return
# Get number of correlations for this spw.
corr_type = commonhelpermethods.get_corr_products(self.inputs.ms, int(spwid))
ncorrs = len(corr_type)
# Select which correlations to consider for computing the mean
# visibility flux:
# - for single and dual pol, consider all correlations.
# - for ncorr == 4 with linear correlation products (XX, XY, etc)
# select the XX and YY columns.
# - for other values of ncorrs, or e.g. circular correlation (LL, RL)
# raise a warning that these are not handled.
if ncorrs in [1, 2]:
columns_to_select = range(ncorrs)
elif ncorrs == 4 and set(corr_type) == {'XX', 'XY', 'YX', 'YY'}:
columns_to_select = [corr_type.index('XX'), corr_type.index('YY')]
else:
LOG.warning("Unexpected polarisations found for MS {}, unable to compute mean visibility fluxes."
"".format(self.inputs.ms.basename))
columns_to_select = []
# Derive mean flux and variance for each polarisation.
mean_fluxes = []
variances = []
for col in columns_to_select:
# Select data for current polarisation.
ampdata = np.squeeze(data['corrected_data'], axis=1)[col]
flagdata = np.squeeze(data['flag'], axis=1)[col]
weightdata = data['weight'][col]
# Select for non-flagged data and non-NaN data.
id_nonbad = np.where(np.logical_and(np.logical_not(flagdata), np.isfinite(ampdata)))
amplitudes = ampdata[id_nonbad]
weights = weightdata[id_nonbad]
# If no valid data are available, skip to the next polarisation.
if len(amplitudes) == 0:
continue
# Determine number of non-flagged antennas, baselines, and
# integrations.
ant1 = data['antenna1'][id_nonbad]
ant2 = data['antenna2'][id_nonbad]
n_ants = len(set(ant1) | set(ant2))
n_baselines = n_ants * (n_ants - 1) // 2
n_ints = len(amplitudes) // n_baselines
# PIPE-644: Determine scale factor for variance.
var_scale = np.mean([len(amplitudes), n_ints * n_ants])
# Compute mean flux and stdev for current polarisation.
mean_flux = np.abs(np.average(amplitudes, weights=weights))
variance = np.average((np.abs(amplitudes) - mean_flux)**2, weights=weights) / var_scale
# Store for this polarisation.
mean_fluxes.append(mean_flux)
variances.append(variance)
# Compute mean flux and mean stdev for all polarisations.
if mean_fluxes:
mean_flux = np.mean(mean_fluxes)
mean_std = np.sqrt(np.mean(variances))
# If no valid data was found for any polarisation, then set flux and
# uncertainty to zero.
else:
mean_flux = 0
mean_std = 0
LOG.debug("No valid data found for MS {}, field {}, spw {}, unable to compute mean visibility flux; flux"
" and uncertainty will be set to zero.".format(self.inputs.ms.basename, fieldid, spwid))
# Combine into single flux measurement object.
flux = domain.FluxMeasurement(spwid, mean_flux, origin=ORIGIN)
flux.uncertainty = domain.FluxMeasurement(spwid, mean_std, origin=ORIGIN)
return flux
def _derive_calvis_flux(self):
"""
Derive calibrated visibility fluxes.
To compute the "calibrated" fluxes, this method will "temporarily"
apply the existing calibration tables, including the the new phase and
amplitude caltables created earlier during the hifa_gfluxscale task.
First, create a back-up of the MS flagging state, then run an applycal
for the necessary intents and fields. Next, compute the calibrated
visibility fluxes. Finally, always restore the back-up of the MS
flagging state, to undo any flags that were propagated from the applied
caltables.
:return: commonfluxresults.FluxCalibrationResults containing the
calibrated visibility fluxes and uncertainties.
"""
inputs = self.inputs
# Identify fields and spws to derive calibrated vis for.
transfer_fields = inputs.ms.get_fields(task_arg=inputs.transfer)
sci_spws = set(inputs.ms.get_spectral_windows(science_windows_only=True))
transfer_fieldids = {str(field.id) for field in transfer_fields}
spw_ids = {str(spw.id) for field in transfer_fields for spw in field.valid_spws.intersection(sci_spws)}
# Create back-up of MS flagging state.
LOG.info('Creating back-up of flagging state')
flag_backup_name = 'before_gfluxscale_calvis'
task = casa_tasks.flagmanager(vis=inputs.vis, mode='save', versionname=flag_backup_name)
self._executor.execute(task)
# Run computation of calibrated visibility fluxes in try/finally to
# ensure that the MS are always restored, even in case of an exception.
try:
# Apply all caltables registered in the callibrary in the local
# context to the MS.
LOG.info('Applying pre-existing caltables and preliminary phase-up and amplitude caltables.')
acinputs = applycal.IFApplycalInputs(context=inputs.context, vis=inputs.vis, field=inputs.transfer,
intent=inputs.transintent, flagsum=False, flagbackup=False)
actask = applycal.IFApplycal(acinputs)
self._executor.execute(actask)
# Initialize result.
result = commonfluxresults.FluxCalibrationResults(inputs.vis)
# Compute the mean calibrated visibility flux for each field and
# spw and add as flux measurement to the final result.
for fieldid in transfer_fieldids:
for spwid in spw_ids:
fluxes_for_field_and_spw = self._compute_calvis_flux(fieldid, spwid)
if fluxes_for_field_and_spw:
result.measurements[fieldid].append(fluxes_for_field_and_spw)
finally:
# Restore the MS flagging state.
LOG.info('Restoring back-up of flagging state.')
task = casa_tasks.flagmanager(vis=inputs.vis, mode='restore', versionname=flag_backup_name)
self._executor.execute(task)
return result
def _derive_fluxscale_flux(self, caltable, refspwmap):
inputs = self.inputs
# Schedule a fluxscale job using this caltable. This is the result
# that contains the flux measurements for the context.
# We need to write the fluxscale-derived flux densities to a file,
# which can then be used as input for the subsequent setjy task.
# This is the name of that file.
# use UUID so that parallel MPI processes do not unlink the same file
reffile = os.path.join(inputs.context.output_dir, 'fluxscale_{!s}.csv'.format(uuid.uuid4()))
try:
fluxscale_result = self._do_fluxscale(caltable, refspwmap=refspwmap)
# Determine fields ids for which a model spix should be
# set along with the derived flux. For now this is
# restricted to BANDPASS fields
fieldids_with_spix = [str(f.id) for f in inputs.ms.get_fields(task_arg=inputs.transfer, intent='BANDPASS')]
# Store the results in a temporary file.
fluxes.export_flux_from_fit_result(fluxscale_result, inputs.context, reffile,
fieldids_with_spix=fieldids_with_spix)
# Finally, do a setjy, add its setjy_settings
# to the main result
self._do_setjy(reffile=reffile, field=inputs.transfer)
# Use the fluxscale measurements to get the uncertainties too.
# This makes the (big) assumption that setjy executed exactly
# what we passed in as arguments.
finally:
# clean up temporary file
if os.path.exists(reffile):
os.remove(reffile)
return fluxscale_result
def _do_gaincal(self, caltable=None, field=None, intent=None, gaintype='G', calmode=None, combine=None, solint=None,
antenna=None, uvrange='', refant=None, minblperant=None, phaseup_spwmap=None, phase_interp=None,
append=None, merge=True):
inputs = self.inputs
# Use only valid science spws
fieldlist = inputs.ms.get_fields(task_arg=field)
sci_spws = set(inputs.ms.get_spectral_windows(science_windows_only=True))
spw_ids = {spw.id for fld in fieldlist for spw in fld.valid_spws.intersection(sci_spws)}
spw_ids = ','.join(map(str, spw_ids))
task_args = {
'output_dir': inputs.output_dir,
'vis': inputs.vis,
'caltable': caltable,
'field': field,
'intent': intent,
'spw': spw_ids,
'solint': solint,
'gaintype': gaintype,
'calmode': calmode,
'minsnr': inputs.minsnr,
'combine': combine,
'refant': refant,
'antenna': antenna,
'uvrange': uvrange,
'minblperant': minblperant,
'solnorm': False,
'append': append
}
# Note that field and antenna task there default values for the
# purpose of setting up the calto object.
task_inputs = gaincal.GTypeGaincal.Inputs(inputs.context, **task_args)
task = gaincal.GTypeGaincal(task_inputs)
# Execute task
result = self._executor.execute(task)
# Merge
if merge:
overrides = {}
# Adjust the spwmap for the combine case
if inputs.ms.combine_spwmap and phase_interp:
overrides['interp'] = phase_interp
# Adjust the spw map
if phaseup_spwmap:
overrides['spwmap'] = phaseup_spwmap
if overrides:
original_calapp = result.pool[0]
modified_calapp = callibrary.copy_calapplication(original_calapp, **overrides)
result.pool[0] = modified_calapp
result.final[0] = modified_calapp
# Merge result to the local context
result.accept(inputs.context)
return result
def _do_fluxscale(self, caltable=None, refspwmap=None):
inputs = self.inputs
task_args = {
'output_dir': inputs.output_dir,
'vis': inputs.vis,
'caltable': caltable,
'reference': inputs.reference,
'transfer': inputs.transfer,
'refspwmap': refspwmap
}
task_inputs = fluxscale.Fluxscale.Inputs(inputs.context, **task_args)
task = fluxscale.Fluxscale(task_inputs)
return self._executor.execute(task, merge=True)
def _do_setjy(self, reffile=None, field=None):
inputs = self.inputs
task_args = {
'output_dir': inputs.output_dir,
'vis': inputs.vis,
'field': field,
'intent': inputs.transintent,
'reffile': reffile
}
task_inputs = setjy.Setjy.Inputs(inputs.context, **task_args)
task = setjy.Setjy(task_inputs)
return self._executor.execute(task, merge=True)
@staticmethod
def _read_data_from_ms(ms, fieldid, spwid, intent):
# Initialize data selection.
data_selection = {'field': fieldid,
'scanintent': '*%s*' % utils.to_CASA_intent(ms, intent),
'spw': spwid}
# Get number of channels for this spw.
nchans = ms.get_spectral_windows(spwid)[0].num_channels
# Read in data from MS.
with casa_tools.MSReader(ms.name) as openms:
try:
# Apply data selection.
openms.msselect(data_selection)
# Read in the weights from MS separately, prior to channel
# averaging.
weights = openms.getdata(['weight'])
# Set channel selection to take the average of all channels.
openms.selectchannel(1, 0, nchans, 1)
# Extract data from MS.
data = openms.getdata(['corrected_data', 'flag', 'antenna1', 'antenna2'])
except:
# Log a warning and return without data.
LOG.warning('Unable to compute mean vis for intent(s) {}, field {}, spw {}'
''.format(intent, fieldid, spwid))
return
# Combine the two dictionaries.
data = {**data, **weights}
return data
def _replace_amplitude_caltable(self, ampresult, fsresult):
inputs = self.inputs
# Identify the MS to process.
vis = os.path.basename(inputs.vis)
# predicate function to match hifa_gfluxscale amplitude caltable for this MS
def gfluxscale_amp_matcher(calto: callibrary.CalToArgs, calfrom: callibrary.CalFrom) -> bool:
calto_vis = {os.path.basename(v) for v in calto.vis}
# Standard caltable filenames contain task identifiers,
# caltable type identifiers, etc. We can use this to identify
# caltables created by this task. As an extra check we also
# check the caltable type.
do_delete = 'hifa_gfluxscale' in calfrom.gaintable and 'gaincal' in calfrom.caltype and vis in calto_vis \
and 'gacal' in calfrom.gaintable
if do_delete:
LOG.debug(f'Unregistering previous amplitude calibrations for {vis}')
return do_delete
inputs.context.callibrary.unregister_calibrations(gfluxscale_amp_matcher)
# Add caltable from fluxscale result to local context callibrary.
orig_calapp = ampresult.pool[0]
new_calapp = callibrary.copy_calapplication(orig_calapp, gaintable=fsresult.inputs['fluxtable'])
LOG.debug(f'Adding calibration to callibrary:\n{new_calapp.calto}\n{new_calapp.calfrom}')
inputs.context.callibrary.add(new_calapp.calto, new_calapp.calfrom)
AMPLITUDE_MISSING = '__AMPLITUDE_MISSING__'
[docs]@task_registry.set_equivalent_casa_task('session_gfluxscale')
class SessionGcorFluxscale(basetask.StandardTaskTemplate):
Inputs = SessionGcorFluxscaleInputs
def __init__(self, inputs):
super(SessionGcorFluxscale, self).__init__(inputs)
is_multi_vis_task = True
[docs] def prepare(self):
inputs = self.inputs
vis_list = sessionutils.as_list(inputs.vis)
assessed = []
with sessionutils.VDPTaskFactory(inputs, self._executor, GcorFluxscale) as factory:
task_queue = [(vis, factory.get_task(vis)) for vis in vis_list]
for (vis, (task_args, task)) in task_queue:
# only launch jobs for MSes with amplitude calibrators.
# The analyse() method will subsequently adopt the
# appropriate flux calibration measurements from one of
# the completed jobs.
ms = inputs.context.observing_run.get_ms(vis)
if 'AMPLITUDE' not in ms.intents:
assessed.append(sessionutils.VisResultTuple(vis, task_args, AMPLITUDE_MISSING))
continue
try:
worker_result = task.get_result()
except exceptions.PipelineException as e:
assessed.append(sessionutils.VisResultTuple(vis, task_args, e))
else:
assessed.append(sessionutils.VisResultTuple(vis, task_args, worker_result))
return assessed
[docs] def analyse(self, assessed):
# all results will be added to this object
final_result = basetask.ResultsList()
context = self.inputs.context
session_groups = sessionutils.group_into_sessions(context, assessed)
for session_id, session_results in session_groups.items():
# we need to convert the Field ID to field name in the
# measurements
measurements_per_field = collect_flux_measurements(context, session_results)
averaged = calc_averages_per_field(measurements_per_field)
for vis, task_args, vis_result in session_results:
if vis_result == AMPLITUDE_MISSING:
no_amplitude_ms = context.observing_run.get_ms(vis)
# find other flux calibrations for any of our fields
no_amplitude_field_names = {f.name for f in no_amplitude_ms.fields}
fields_to_adopt = no_amplitude_field_names.intersection(set(averaged.keys()))
if len(fields_to_adopt) is 0:
LOG.error('Could not find a flux calibration to adopt for '
'{!s}.'.format(no_amplitude_ms.basename))
continue
LOG.info('Adopting flux calibrations for {!s}; fields: {!s}'
''.format(no_amplitude_ms.basename, ', '.join(fields_to_adopt)))
# these are the measurements to adopt, but the spw
# names still need to be remapped to spw IDs for
# this MS
unmapped_adopted = {k: v for k, v in averaged.items() if k in no_amplitude_field_names}
mapped_adopted = map_spw_names_to_id(context, vis, unmapped_adopted)
fake_result = GcorFluxscaleResults(vis=vis, measurements=mapped_adopted, applies_adopted=True)
fake_result.inputs = task_args
fake_result.task = SessionGcorFluxscale
final_result.append(fake_result)
elif isinstance(vis_result, Exception):
LOG.error('No flux calibration created for {!s}'.format(os.path.basename(vis)))
fake_result = GcorFluxscaleResults(vis=vis)
fake_result.inputs = task_args
final_result.append(fake_result)
else:
final_result.append(vis_result)
return final_result
def get_field_name(context, vis, identifier):
ms = context.observing_run.get_ms(vis)
fields = set(ms.get_fields(task_arg=identifier))
if len(fields) is not 1:
raise KeyError('{!r} does not uniquely identify a field: ({!s} matches found)'
''.format(identifier, len(fields)))
fields = fields.pop()
return fields.name
def collect_flux_measurements(context, vis_result_tuples):
"""
Compile the flux measurements from a set of results into a new
dict data structure.
:param context: the pipeline context
:param vis_result_tuples: the VisResultTuples to inspect
:type vis_result_tuples: list of VisResultTuples
:return: dict of tuples
:rtype: dict of {str field name: (vis, spw name, flux measurement)}
"""
d = collections.defaultdict(list)
for vis, _, result in vis_result_tuples:
if result == AMPLITUDE_MISSING:
continue
ms = context.observing_run.get_ms(vis)
for field_id, measurements in result.measurements.items():
field_name = get_field_name(context, vis, field_id)
for m in measurements:
spws = ms.get_spectral_windows(task_arg=m.spw_id)
assert(len(spws) is 1)
spw = spws.pop()
d[field_name].append((vis, spw.name, m))
return d
def calc_averages_per_field(results):
"""
Return a compiled and averaged flux calibrations per field.
:param results:
:return:
"""
averages = collections.defaultdict(list)
for field_name, measurement_structs in results.items():
spw_names = {spw_name for _, spw_name, _ in measurement_structs}
for spw_name in spw_names:
measurements_for_spw = [measurement for _, name, measurement in measurement_structs
if name == spw_name]
if len(measurements_for_spw) == 0:
continue
mean = reduce(operator.add, measurements_for_spw) / len(measurements_for_spw)
# copy the uncertainty if there's only one measurement,
# otherwise calculate the standard error of the mean.
if len(measurements_for_spw) == 1:
m = measurements_for_spw[0]
unc_I = m.uncertainty.I
unc_Q = m.uncertainty.Q
unc_U = m.uncertainty.U
unc_V = m.uncertainty.V
else:
JY = measures.FluxDensityUnits.JANSKY
unc_I = stats.sem([float(m.I.to_units(JY)) for m in measurements_for_spw])
unc_Q = stats.sem([float(m.Q.to_units(JY)) for m in measurements_for_spw])
unc_U = stats.sem([float(m.U.to_units(JY)) for m in measurements_for_spw])
unc_V = stats.sem([float(m.V.to_units(JY)) for m in measurements_for_spw])
# floats are interpreted as Jy, so we don't need to convert
# SEM values
mean.uncertainty = FluxMeasurement(spw_name, unc_I, Q=unc_Q, U=unc_U, V=unc_V, origin=ORIGIN)
averages[field_name].append((spw_name, mean))
return averages
def map_spw_names_to_id(context, vis, field_measurements):
"""
Copy a flux result dict, remapping the target spectral window in
the original result to a new measurement set.
This function makes a copy of a dict of flux calibration results
(keys=field names, values=FluxMeasurements), remapping the spectral
window target in the results to the corresponding spectral window
in the target measurement set.
:param context: pipeline context
:param vis: name of the measurement set to remap spws to
:param field_measurements: flux calibrations to adopt
:type field_measurements: dict with format {str: [FluxMeasurements]}
:return: flux results remapped to measurement set
:rtype: dict with format {str: [FluxMeasurements]}
"""
ms = context.observing_run.get_ms(vis)
science_spws = ms.get_spectral_windows(science_windows_only=True)
spw_names_to_id = {spw.name: spw.id for spw in science_spws}
# spw names must uniquely identify a science spw, otherwise we
# can't create a correct spw ID mapping
assert (len(spw_names_to_id) == len(science_spws))
d = {field_name: [copy_flux_measurement(m, spw_id=spw_names_to_id[spw_name])
for spw_name, m in measurements if spw_name in spw_names_to_id]
for field_name, measurements in field_measurements.items()}
return d
def copy_flux_measurement(source, spw_id=None, I=None, Q=None, U=None, V=None, spix=None, uI=None, uQ=None, uU=None,
uV=None):
if spw_id is None:
spw_id = source.spw_id
if I is None:
I = source.I
if Q is None:
Q = source.Q
if U is None:
U = source.U
if V is None:
V = source.V
if spix is None:
spix = source.spix
new_fm = FluxMeasurement(spw_id, I, Q=Q, U=U, V=V, spix=spix, origin=ORIGIN)
if uI is None:
uI = source.uncertainty.I
if uQ is None:
uQ = source.uncertainty.Q
if uU is None:
uU = source.uncertainty.U
if uV is None:
uV = source.uncertainty.V
new_fm.uncertainty = FluxMeasurement(spw_id, uI, Q=uQ, U=uU, V=uV, origin=ORIGIN)
return new_fm