"""
Created on 23 Oct 2014
@author: sjw
"""
import collections
import decimal
import itertools
import math
import operator
import os
import matplotlib.pyplot as plt
import pipeline.domain.measures as measures
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.renderer.basetemplates as basetemplates
import pipeline.infrastructure.utils as utils
from pipeline.domain.measures import FluxDensityUnits, FrequencyUnits
from pipeline.h.tasks.common import atmutil
from pipeline.h.tasks.importdata.fluxes import ORIGIN_XML, ORIGIN_ANALYSIS_UTILS
from pipeline.infrastructure.renderer import logger
from . import display as gfluxscale
from ..importdata.dbfluxes import ORIGIN_DB
LOG = logging.get_logger(__name__)
CATALOGUE_SOURCES = (ORIGIN_ANALYSIS_UTILS, ORIGIN_DB, ORIGIN_XML)
[docs]class T2_4MDetailsGFluxscaleRenderer(basetemplates.T2_4MDetailsDefaultRenderer):
def __init__(self, uri='gfluxscale.mako',
description='Transfer fluxscale from amplitude calibrator',
always_rerender=False):
super(T2_4MDetailsGFluxscaleRenderer, self).__init__(
uri=uri, description=description, always_rerender=always_rerender)
[docs] def update_mako_context(self, mako_context, pipeline_context, results):
# All antenna, sort by baseband
ampuv_allant_plots = collections.defaultdict(dict)
for intents in ['AMPLITUDE']:
plots = self.create_plots(pipeline_context, results, gfluxscale.GFluxscaleSummaryChart, intents)
self.sort_plots_by_baseband(plots)
for vis, vis_plots in plots.items():
if len(vis_plots) > 0:
ampuv_allant_plots[vis][intents] = vis_plots
# List of antenna for the fluxscale result, sorted by baseband
ampuv_ant_plots = collections.defaultdict(dict)
for intents in ['AMPLITUDE']:
plots = self.create_plots_ants(pipeline_context, results, gfluxscale.GFluxscaleSummaryChart, intents)
self.sort_plots_by_baseband(plots)
for vis, vis_plots in plots.items():
if len(vis_plots) > 0:
ampuv_ant_plots[vis][intents] = vis_plots
flux_comparison_plots = self.create_flux_comparison_plots(pipeline_context, results)
table_rows = make_flux_table(pipeline_context, results)
adopted_rows = make_adopted_table(results)
mako_context.update({
'adopted_table': adopted_rows,
'ampuv_allant_plots': ampuv_allant_plots,
'ampuv_ant_plots': ampuv_ant_plots,
'flux_plots': flux_comparison_plots,
'table_rows': table_rows
})
[docs] @staticmethod
def sort_plots_by_baseband(d):
for vis, plots in d.items():
plots = sorted(plots, key=lambda plot: plot.parameters['baseband'])
d[vis] = plots
[docs] @staticmethod
def create_flux_comparison_plots(context, results):
output_dir = os.path.join(context.report_dir, 'stage%s' % results.stage_number)
d = {}
for result in results:
vis = os.path.basename(result.inputs['vis'])
d[vis] = create_flux_comparison_plots(context, output_dir, result)
return d
[docs] def create_plots(self, context, results, plotter_cls, intents, renderer_cls=None):
"""
Create plots and return a dictionary of vis:[Plots]. No antenna or UVrange selection.
"""
d = {}
for result in results:
plots = self.plots_for_result(context, result, plotter_cls, intents, renderer_cls)
d = utils.dict_merge(d, plots)
return d
[docs] def create_plots_ants(self, context, results, plotter_cls, intents, renderer_cls=None):
"""
Create plots and return a dictionary of vis:[Plots].
Antenna and UVrange selection determined by heuristics.
"""
d = {}
for result in results:
# PIPE-33: when all antennas are selected on the ampcal, suppress the second set of amp(uvdist;model) plot
if result.resantenna == '':
continue
plots = self.plots_for_result(context, result, plotter_cls, intents, renderer_cls, ant=result.resantenna,
uvrange=result.uvrange)
d = utils.dict_merge(d, plots)
return d
[docs] @staticmethod
def plots_for_result(context, result, plotter_cls, intents, renderer_cls=None, ant='', uvrange=''):
vis = os.path.basename(result.inputs['vis'])
output_dir = os.path.join(context.report_dir, 'stage%s' % result.stage_number)
# create a fake CalTo object so we can use the applycal class
fields = result.inputs['reference']
calto = callibrary.CalTo(result.inputs['vis'], field=fields)
plotter = plotter_cls(context, output_dir, calto, intents, ant=ant, uvrange=uvrange)
plots = plotter.plot()
d = collections.defaultdict(dict)
d[vis] = plots
if renderer_cls is not None:
renderer = renderer_cls(context, result, plots)
with renderer.get_file() as fileobj:
fileobj.write(renderer.render())
return d
FluxTR = collections.namedtuple('FluxTR', 'vis field spw freqbw i q u v fluxratio spix')
[docs]def make_flux_table(context, results):
# will hold all the flux stat table rows for the results
rows = []
for single_result in results:
ms_for_result = context.observing_run.get_ms(single_result.vis)
vis_cell = os.path.basename(single_result.vis)
transintent = set(single_result.inputs['transintent'].split(','))
# measurements will be empty if calibrated visibility flux derivation failed
if len(single_result.measurements) == 0:
continue
for field_arg in sorted(single_result.measurements, key=lambda f: ms_for_result.get_fields(f)[0].id):
field = ms_for_result.get_fields(field_arg)[0]
intents = " ". join(sorted(field.intents.intersection(transintent)))
field_cell = '%s (#%s) %s' % (field.name, field.id, intents)
for measurement in sorted(single_result.measurements[field_arg], key=operator.attrgetter('spw_id')):
spw = ms_for_result.get_spectral_window(measurement.spw_id)
freqbw = '%s %s' % (str(spw.centre_frequency), str(spw.bandwidth))
fluxes = collections.defaultdict(lambda: 'N/A')
for stokes in ['I', 'Q', 'U', 'V']:
try:
flux = getattr(measurement, stokes)
unc = getattr(measurement.uncertainty, stokes)
flux_jy = flux.to_units(measures.FluxDensityUnits.JANSKY)
if stokes == 'I':
flux_jy_I = flux_jy
unc_jy = unc.to_units(measures.FluxDensityUnits.JANSKY)
if flux_jy != 0 and unc_jy != 0:
unc_ratio = decimal.Decimal('100')*(unc_jy/flux_jy)
uncertainty = ' ± %s (%0.1f%%)' % (str(unc), unc_ratio)
else:
uncertainty = ''
fluxes[stokes] = '%s%s' % (flux, uncertainty)
except:
pass
try:
fluxes['spix'] = '%s' % getattr(measurement, 'spix')
except:
fluxes['spix'] = '0.0'
# Get the corresponding catalog flux
catfluxes = collections.defaultdict(lambda: 'N/A')
flux_ratio = 'N/A'
cat_measurements = [o for o in field.flux_densities if o.origin in CATALOGUE_SOURCES]
for catmeasurement in cat_measurements:
if catmeasurement.spw_id != int(measurement.spw_id):
continue
for stokes in ['I', 'Q', 'U', 'V']:
try:
catflux = getattr(catmeasurement, stokes)
catflux_jy = catflux.to_units(measures.FluxDensityUnits.JANSKY)
if stokes == 'I':
catflux_jy_I = catflux_jy
catfluxes[stokes] = ' %s' % (catflux)
except:
pass
try:
catfluxes['spix'] = '%s' % getattr(catmeasurement, 'spix')
except:
catfluxes['spix'] = '0.0'
if fluxes['I'] != 'N/A' and catfluxes['I'] != 'N/A':
flux_ratio = '%0.3f' % (float(flux_jy_I) / float(catflux_jy_I))
break
# Get the corresponding fluxscale derived fluxes.
fsfluxes = collections.defaultdict(lambda: 'N/A')
fs_measurements = single_result.fluxscale_measurements
if str(field.id) in fs_measurements:
for fs_measurement in fs_measurements[str(field.id)]:
if fs_measurement.spw_id != int(measurement.spw_id):
continue
for stokes in ['I', 'Q', 'U', 'V']:
try:
fsflux = getattr(fs_measurement, stokes)
fsunc = getattr(fs_measurement.uncertainty, stokes)
fsflux_jy = fsflux.to_units(measures.FluxDensityUnits.JANSKY)
fsunc_jy = fsunc.to_units(measures.FluxDensityUnits.JANSKY)
if fsflux_jy != 0 and fsunc_jy != 0:
fsunc_ratio = decimal.Decimal('100') * (fsunc_jy / fsflux_jy)
fsunc_str = ' ± %s (%0.1f%%)' % (str(fsunc), fsunc_ratio)
else:
fsunc_str = ''
fsfluxes[stokes] = '%s%s' % (fsflux, fsunc_str)
except:
pass
try:
fsfluxes['spix'] = '%s' % getattr(fs_measurement, 'spix')
except:
fsfluxes['spix'] = '0.0'
break
# Create the table row for current result (vis), field, and spw.
tr = FluxTR(vis_cell, field_cell, measurement.spw_id, freqbw,
fsfluxes['I'],
fsfluxes['Q'],
fsfluxes['U'],
fsfluxes['V'],
flux_ratio,
fluxes['spix'])
rows.append(tr)
tr = FluxTR(vis_cell, field_cell, measurement.spw_id, freqbw,
fluxes['I'],
fluxes['Q'],
fluxes['U'],
fluxes['V'],
flux_ratio,
fluxes['spix'])
rows.append(tr)
tr = FluxTR(vis_cell, field_cell, measurement.spw_id, freqbw,
catfluxes['I'],
catfluxes['Q'],
catfluxes['U'],
catfluxes['V'],
flux_ratio,
fluxes['spix'])
rows.append(tr)
return utils.merge_td_columns(rows)
AdoptedTR = collections.namedtuple('AdoptedTR', 'vis fields')
[docs]def make_adopted_table(results):
# will hold all the flux stat table rows for the results
rows = []
for adopted_result in [r for r in results if r.applies_adopted]:
vis_cell = os.path.basename(adopted_result.vis)
field_cell = ', '.join(str(x) for x in adopted_result.measurements)
tr = AdoptedTR(vis_cell, field_cell)
rows.append(tr)
return utils.merge_td_columns(rows)
[docs]def create_flux_comparison_plots(context, output_dir, result, showatm=True):
ms = context.observing_run.get_ms(result.vis)
plots = []
for field_id, measurements in result.measurements.items():
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
fields = ms.get_fields(task_arg=field_id)
assert len(fields) == 1
field = fields[0]
ax.set_xlabel('Frequency (GHz)')
ax.set_ylabel('Flux Density (Jy)')
# Avoid offset values (PIPE-644)
ax.yaxis.set_major_formatter(plt.ScalarFormatter(useOffset=False))
colours = itertools.cycle('bgrcmyk')
x_min = 1e99
x_max = 0
for m in sorted(measurements, key=operator.attrgetter('spw_id')):
# cycle colours so that windows centred on the same frequency are distinguishable
colour = next(colours)
spw = ms.get_spectral_window(m.spw_id)
x = spw.centre_frequency.to_units(FrequencyUnits.GIGAHERTZ)
x_unc = decimal.Decimal('0.5') * spw.bandwidth.to_units(FrequencyUnits.GIGAHERTZ)
# Plot calibrated fluxes. PIPE-566: if both flux and uncertainty
# are zero, then do not plot the value to avoid affecting the
# automatic y-range.
y = m.I.to_units(FluxDensityUnits.JANSKY)
y_unc = m.uncertainty.I.to_units(FluxDensityUnits.JANSKY)
if not (y == 0 and y_unc == 0):
label = 'Calibrated data flux for spw {}'.format(spw.id)
ax.errorbar(x, y, xerr=x_unc, yerr=y_unc, fmt='{!s}-o'.format(colour), label=label)
x_min = min(x_min, x - x_unc)
x_max = max(x_max, x + x_unc)
# Plot fluxes from ASDM, catalog, and/or analysisUtils.
catalogue_fluxes = {
ORIGIN_XML: 'ASDM',
ORIGIN_DB: 'online catalogue',
ORIGIN_ANALYSIS_UTILS: 'analysisUtils'
}
ages = []
for origin, label in catalogue_fluxes.items():
fluxes = [f for f in field.flux_densities if f.origin == origin]
if not fluxes:
continue
ages.extend([f.age for f in fluxes])
spws = [ms.get_spectral_window(f.spw_id) for f in fluxes]
x = [spw.centre_frequency.to_units(FrequencyUnits.GIGAHERTZ) for spw in spws]
y = [f.I.to_units(FluxDensityUnits.JANSKY) for f in fluxes]
spix = [float(f.spix) for f in fluxes]
# sort by frequency
x, y, spix = list(zip(*sorted(zip(x, y, spix))))
# PIPE-644: always plot catalog fluxes in black.
colour = "k"
ax.plot(x, y, marker='o', color=colour, label='Data source: {}'.format(label))
s_xmin = scale_flux(x[0], y[0], x_min, spix[0])
s_xmax = scale_flux(x[-1], y[-1], x_max, spix[-1])
ax.plot([x[0], x_min], [y[0], s_xmin], color=colour, label='Spectral index extrapolation',
linestyle='dotted')
ax.plot([x[-1], x_max], [y[-1], s_xmax], color=colour, label='_nolegend_', linestyle='dotted')
# Check if catalog fluxes share a single age that is not None, and take
# this to represent to catalog flux age; otherwise set age to N/A.
uniq_ages = set([age for age in ages if age is not None])
if len(uniq_ages) == 1:
age = uniq_ages.pop()
else:
age = 'N/A'
# Add plot title.
# PIPE-644: include age of catalog fluxes in title.
title_str = "Flux calibration: {} (age = {} days)".format(field.name, age)
ax.set_title(title_str)
# Plot atmospheric transmission.
if showatm:
atm_color = 'm'
# Create 2nd axis for atmospheric transmission.
axes_atm = ax.twinx()
axes_atm.set_ylabel('ATM Transmission', color=atm_color, labelpad=2)
axes_atm.set_ylim(0, 1.05)
axes_atm.tick_params(direction='out', colors=atm_color)
axes_atm.yaxis.set_major_formatter(plt.FuncFormatter(lambda t, pos: '{}%'.format(int(t * 100))))
axes_atm.yaxis.tick_right()
# Select antenna to use for determining atmospheric transmission:
# Preferably use highest ranked reference antenna, otherwise
# use antenna ID = 0.
ant_id = 0
if hasattr(ms, 'reference_antenna') and isinstance(ms.reference_antenna, str):
ant_id = ms.get_antenna(search_term=ms.reference_antenna.split(',')[0])[0].id
# For each spw in the flux measurements, compute and plot the
# atmospheric transmission vs. frequency.
spw_ids = sorted([m.spw_id for m in measurements])
for spw_id in spw_ids:
atm_freq, atm_transmission = atmutil.get_transmission(vis=result.vis, spw_id=spw_id, antenna_id=ant_id)
axes_atm.plot(atm_freq, atm_transmission, color=atm_color, linestyle='-')
# Include plot legend.
leg = ax.legend(loc='best', numpoints=1, prop={'size': 8})
leg.get_frame().set_alpha(0.5)
figfile = '{}-field{}-flux_calibration.png'.format(ms.basename, field_id)
# Save figure to file.
full_path = os.path.join(output_dir, figfile)
fig.savefig(full_path)
# Create a wrapper for current plot, and append to list of plots.
parameters = {
'vis': ms.basename,
'field': field.name,
'field_id': field.id,
'intent': sorted(set(field.intents))
}
wrapper = logger.Plot(full_path, x_axis='frequency', y_axis='Flux Density', parameters=parameters)
plots.append(wrapper)
return plots
[docs]def scale_flux(f1, s1, f2, spix):
"""Returns flux at a frequency by extrapolating via the spectral index.
:param f1: frequency 1
:param s1: flux density at frequency 1
:param f2: frequency 2
:param spix: spectral index
:return: flux density at frequency 2
"""
return math.pow(10, spix * math.log10(f2/f1) + math.log10(s1))