import os
import collections
import shutil
import itertools
import operator
import pipeline.infrastructure
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.filenamer as filenamer
import pipeline.infrastructure.logging as logging
import pipeline.infrastructure.renderer.basetemplates as basetemplates
import pipeline.infrastructure.utils as utils
from pipeline.infrastructure import casa_tools
from . import csvfilereader
from pipeline.h.tasks.applycal.renderer import *
from pipeline.h.tasks.common.displays import applycal as applycal
from pipeline.hsd.tasks.common import utils as sdutils
LOG = logging.get_logger(__name__)
JyperKTRV = collections.namedtuple('JyperKTRV', 'virtualspw msname realspw antenna pol factor')
JyperKTR = collections.namedtuple('JyperKTR', 'spw msname antenna pol factor')
FlagTotal = collections.namedtuple('FlagSummary', 'flagged total')
[docs]class T2_4MDetailsNRORestoreDataRenderer(basetemplates.T2_4MDetailsDefaultRenderer):
def __init__(self, uri='hsdn_restoredata.mako',
description='Restoredata with scale adjustment between beams for NRO FOREST data.',
always_rerender=False):
super(T2_4MDetailsNRORestoreDataRenderer, self).__init__(
uri=uri, description=description, always_rerender=always_rerender)
[docs] def update_mako_context(self, ctx, context, results):
ctx_result = ctx['result']
ctx_result0 = ctx_result[0]
ctx_result0_inputs = ctx_result0.inputs
inputs = {}
for key, value in ctx_result0_inputs.items():
if 'rawdata_dir' in key or 'products_dir' in key or 'vis' in key or 'output_dir' in key or 'reffile' in key:
inputs[key] = value
result_inputs = inputs
LOG.debug('result_inputs = {0}'.format(result_inputs));
ctx['result'].inputs = result_inputs
reffile = None
spw_factors = collections.defaultdict(lambda: [])
valid_spw_factors = collections.defaultdict(lambda: collections.defaultdict(lambda: []))
dovirtual = sdutils.require_virtual_spw_id_handling(context.observing_run)
trfunc_r = lambda _vspwid, _vis, _rspwid, _antenna, _pol, _factor: JyperKTR(_rspwid, _vis, _antenna, _pol, _factor)
trfunc_v = lambda _vspwid, _vis, _rspwid, _antenna, _pol, _factor: JyperKTRV(_vspwid, _vis, _rspwid, _antenna, _pol, _factor)
trfunc = trfunc_v if dovirtual else trfunc_r
res0 = results[0]
ampcal_results = res0.ampcal_results
applycal_results = res0.applycal_results
for r in ampcal_results:
metadata = None
# Read reffile and insert the elements into a list "lines".
reffile = r.reffile
if not os.path.exists(reffile):
LOG.warn('The factor file is not found in current directory: os.path.exists(reffile) = {0}'.format(os.path.exists(reffile)));
metadata = ['No Data : No Data']
break
else:
LOG.info('os.path.exists(reffile) = {0}'.format(os.path.exists(reffile)));
with open(reffile, 'r') as f:
lines = f.readlines()
# Count the line numbers for the beginning of metadata part and the end of it.
if len(lines) == 0:
LOG.warn('The factor file is invalid format: size of reffile = {0}'.format(len(lines)));
metadata = ['No Data : No Data']
break
else:
count = 0
beginpoint = 0
endpoint = 0
for elem in lines:
count += 1
if elem.startswith('#---Fill'):
beginpoint = count
LOG.debug('beginpoint = {0}'.format(beginpoint))
if elem.startswith('#---End'):
endpoint = count
LOG.debug('endpoint = {0}'.format(endpoint))
continue
# Insert the elements (from beginpoint to endpoint) into a list "metadata_tmp".
metadata_tmp = []
elem = ""
key = ""
value = ""
multivalue = ""
felem = ""
count = 0
for elem in lines:
count += 1
if count < beginpoint + 1:
continue
if count >= endpoint:
continue
elem = elem.replace('\r','')
elem = elem.replace('\n','')
elem = elem.replace('#','')
elem = elem.lstrip()
check = elem.split()
# The lines without "#" are regarded as all FreeMemo's values.
if len(elem) == 0:
LOG.debug('Skipped the blank line of the reffile.');
continue
else:
if not ":" in check[0]:
key = 'FreeMemo'
value = elem
elem = key + ':' + value
else:
onepair = elem.split(':', 1)
key = "".join(onepair[0])
value = "".join(onepair[1])
elem = key + ':' + value
metadata_tmp.append(elem)
if len(metadata_tmp) == 0:
LOG.info('The factor file is invalid format. [No Data : No Data] is inserted instead of blank.')
metadata = ['No Data : No Data']
break
else:
LOG.debug('metadata_tmp: {0}'.format(metadata_tmp))
# Arrange "metadata_tmp" list to "metadata" list to connect FreeMemo values.
metadata = []
elem = ""
for elem in metadata_tmp:
onepair = elem.split(':', 1)
key = "".join(onepair[0])
value = "".join(onepair[1])
if 'FreeMemo' in key:
multivalue += value + '<br>'
elem = key + ':' + multivalue
else:
elem = key + ':' + value
metadata.append(elem)
felem = 'FreeMemo:' + multivalue
metadata.append(felem)
LOG.info('metadata: {0}'.format(metadata))
ctx.update({'metadata': metadata})
for r in ampcal_results:
# rearrange scaling factors
ms = context.observing_run.get_ms(name=r.vis)
vis = ms.basename
spw_band = {}
for spw in ms.get_spectral_windows(science_windows_only=True):
spwid = spw.id
vspwid = context.observing_run.real2virtual_spw_id(spwid, ms)
ddid = ms.get_data_description(spw=spwid)
if vspwid not in spw_band:
spw_band[vspwid] = spw.band
for ant in ms.get_antenna():
LOG.debug('ant = {0}'.format(ant));
ant_name = ant.name
corrs = list(map(ddid.get_polarization_label, range(ddid.num_polarizations)))
# an attempt to collapse pol rows
# corr_collector[factor] = [corr0, corr1, ...]
corr_collector = collections.defaultdict(lambda: [])
for corr in corrs:
factor = self.__get_factor(r.factors, vis, spwid, ant_name, corr)
corr_collector[factor].append(corr)
for factor, corrlist in corr_collector.items():
corr = str(', ').join(corrlist)
jyperk = factor if factor is not None else 'N/A (1.0)'
tr = trfunc(vspwid, vis, spwid, ant_name, corr, jyperk)
spw_factors[vspwid].append(tr)
if factor is not None:
valid_spw_factors[vspwid][corr].append(factor)
reffile = r.reffile
LOG.debug('reffile = {0}'.format(reffile));
stage_dir = os.path.join(context.report_dir, 'stage%s' % ampcal_results.stage_number)
# input file to correct relative amplitude
reffile_copied = None
if reffile is not None and os.path.exists(reffile):
shutil.copy2(reffile, stage_dir)
reffile_copied = os.path.join(stage_dir, os.path.basename(reffile))
# order table rows so that spw comes first
row_values = []
for factor_list in spw_factors.values():
row_values += list(factor_list)
# set context
ctx.update({'jyperk_rows': utils.merge_td_columns(row_values),
'reffile': reffile_copied,
'dovirtual': dovirtual})
weblog_dir = os.path.join(context.report_dir,
'stage%s' % applycal_results.stage_number)
LOG.debug('weblog_dir = {0}'.format(weblog_dir));
flag_totals = {}
for r in applycal_results:
LOG.debug('r in applycal_results = {0}'.format(r));
if r.inputs['flagsum'] == True:
flag_totals = utils.dict_merge(flag_totals, self.flags_for_result(r, context))
calapps = {}
for r in applycal_results:
calapps = utils.dict_merge(calapps,
self.calapps_for_result(r))
LOG.debug('calapps = {0}'.format(calapps));
caltypes = {}
for r in applycal_results:
caltypes = utils.dict_merge(caltypes,
self.caltypes_for_result(r))
LOG.debug('caltypes = {0}'.format(caltypes));
filesizes = {}
for r in applycal_results:
vis = r.inputs['vis']
ms = context.observing_run.get_ms(vis)
filesizes[ms.basename] = ms._calc_filesize()
# return all agents so we get ticks and crosses against each one
agents = ['before', 'applycal']
ctx.update({
'flags': flag_totals,
'calapps': calapps,
'caltypes': caltypes,
'agents': agents,
'dirname': weblog_dir,
'filesizes': filesizes
})
# CAS-5970: add science target plots to the applycal page
(science_amp_vs_freq_summary_plots, uv_max) = self.create_single_dish_science_plots(context, applycal_results)
ctx.update({
'science_amp_vs_freq_plots': science_amp_vs_freq_summary_plots,
'uv_max': uv_max,
})
LOG.debug('ctx = {0}'.format(ctx));
[docs] @staticmethod
def science_plots_for_result(context, result, plotter_cls, fields, uvrange=None, renderer_cls=None):
overrides = {'coloraxis': 'spw'}
if uvrange is not None:
overrides['uvrange'] = uvrange
# CAS-9395: ALMA pipeline weblog plot of calibrated amp vs.
# frequency with avgantenna=True and a uvrange upper limit leads
# to misleading results and wrong conclusions
overrides['avgantenna'] = False
plots = []
plot_output_dir = os.path.join(context.report_dir, 'stage%s' % result.stage_number)
calto, _ = _get_data_selection_for_plot(context, result, ['TARGET'])
for field in fields:
# override field when plotting amp/phase vs frequency, as otherwise
# the field is resolved to a list of all field IDs
overrides['field'] = field
plotter = plotter_cls(context, plot_output_dir, calto, 'TARGET', **overrides)
plots.extend(plotter.plot())
for plot in plots:
plot.parameters['intent'] = ['TARGET']
if renderer_cls is not None:
renderer = renderer_cls(context, result, plots)
with renderer.get_file() as fileobj:
fileobj.write(renderer.render())
return plots
[docs] def create_single_dish_science_plots(self, context, results):
"""
Create plots for the science targets, returning two dictionaries of
vis:[Plots].
MODIFIED for single dish
"""
amp_vs_freq_summary_plots = collections.defaultdict(dict)
max_uvs = collections.defaultdict(dict)
amp_vs_freq_detail_plots = {}
for result in results:
vis = os.path.basename(result.inputs['vis'])
ms = context.observing_run.get_ms(vis)
max_uvs[vis] = measures.Distance(value=0.0, units=measures.DistanceUnits.METRE)
amp_vs_freq_summary_plots[vis] = []
# Plot for 1 science field (either 1 science target or for a mosaic 1
# pointing). The science field that should be chosen is the one with
# the brightest average amplitude over all spws
representative_source_name, _ = ms.get_representative_source_spw()
representative_source = {s for s in ms.sources if s.name == representative_source_name}
if len(representative_source) >= 1:
representative_source = representative_source.pop()
brightest_field = get_brightest_field(ms, representative_source)
plots = self.science_plots_for_result(context,
result,
applycal.RealVsFrequencySummaryChart,
[brightest_field.id], None)
for plot in plots:
plot.parameters['source'] = representative_source
amp_vs_freq_summary_plots[vis].extend(plots)
if pipeline.infrastructure.generate_detail_plots(result):
fields = set()
# scans = ms.get_scans(scan_intent='TARGET')
# for scan in scans:
# fields.update([field.id for field in scan.fields])
with casa_tools.MSMDReader(vis) as msmd:
fields.update(list(msmd.fieldsforintent("OBSERVE_TARGET#ON_SOURCE")))
# Science target detail plots. Note that summary plots go onto the
# detail pages; we don't create plots per spw or antenna
plots = self.science_plots_for_result(context,
result,
applycal.RealVsFrequencySummaryChart,
fields, None,
ApplycalAmpVsFreqSciencePlotRenderer)
amp_vs_freq_detail_plots[vis] = plots
for d, plotter_cls in (
(amp_vs_freq_detail_plots, ApplycalAmpVsFreqSciencePlotRenderer),):
if d:
all_plots = list(utils.flatten([v for v in d.values()]))
renderer = plotter_cls(context, result, all_plots)
with renderer.get_file() as fileobj:
fileobj.write(renderer.render())
return amp_vs_freq_summary_plots, max_uvs
@staticmethod
def __get_factor(factor_dict, vis, spwid, ant_name, pol_name):
"""
Returns a factor corresponding to vis, spwid, ant_name, and pol_name from
a factor_dict[vis][spwid][ant_name][pol_name] = factor
If factor_dict lack corresponding factor, the method returns None.
"""
if (vis not in factor_dict or
spwid not in factor_dict[vis] or
ant_name not in factor_dict[vis][spwid] or
pol_name not in factor_dict[vis][spwid][ant_name]):
return None
return factor_dict[vis][spwid][ant_name][pol_name]
[docs] def plots_for_result(self, context, result, plotter_cls, intents, renderer_cls=None, **kwargs):
vis = os.path.basename(result.inputs['vis'])
output_dir = os.path.join(context.report_dir, 'stage%s' % result.stage_number)
calto, str_intents = _get_data_selection_for_plot(context, result, intents)
plotter = plotter_cls(context, output_dir, calto, str_intents, **kwargs)
plots = plotter.plot()
for plot in plots:
plot.parameters['intent'] = intents
d = {vis: plots}
path = None
if renderer_cls is not None:
renderer = renderer_cls(context, result, plots)
with renderer.get_file() as fileobj:
fileobj.write(renderer.render())
path = renderer.path
return d, path
[docs] def calapps_for_result(self, result):
calapps = collections.defaultdict(list)
for calapp in result.applied:
vis = os.path.basename(calapp.vis)
calapps[vis].append(calapp)
return calapps
[docs] def caltypes_for_result(self, result):
type_map = {
'bandpass': 'Bandpass',
'gaincal': 'Gain',
'tsys': 'T<sub>sys</sub>',
'wvr': 'WVR',
'ps': 'Sky',
}
d = {}
for calapp in result.applied:
for calfrom in calapp.calfrom:
caltype = type_map.get(calfrom.caltype, calfrom.caltype)
if calfrom.caltype == 'gaincal':
# try heuristics to detect phase-only and amp-only
# solutions
caltype += self.get_gain_solution_type(calfrom.gaintable)
d[calfrom.gaintable] = caltype
return d
[docs] def get_gain_solution_type(self, gaintable):
# CAS-9835: hif_applycal() "type" descriptions are misleading /
# incomplete in weblog table
#
# quick hack: match filenamer-generated file names
#
# TODO find a way to attach the originating task to the callibrary entries
if gaintable.endswith('.gacal.tbl'):
return ' (amplitude only)'
if gaintable.endswith('.gpcal.tbl'):
return ' (phase only)'
if gaintable.endswith('.gcal.tbl'):
return ''
# resort to inspecting caltable values to infer what its type is
# solve circular import problem by importing at run-time
from pipeline.infrastructure import casa_tasks
# get stats on amp solution of gaintable
calstat_job = casa_tasks.calstat(caltable=gaintable, axis='amp',
datacolumn='CPARAM', useflags=True)
calstat_result = calstat_job.execute(dry_run=False)
stats = calstat_result['CPARAM']
# amp solutions of unity imply phase-only was requested
tol = 1e-3
no_amp_soln = all([utils.approx_equal(stats['sum'], stats['npts'], tol),
utils.approx_equal(stats['min'], 1, tol),
utils.approx_equal(stats['max'], 1, tol)])
# same again for phase solution
calstat_job = casa_tasks.calstat(caltable=gaintable, axis='phase',
datacolumn='CPARAM', useflags=True)
calstat_result = calstat_job.execute(dry_run=False)
stats = calstat_result['CPARAM']
# phase solutions ~ 0 implies amp-only solution
tol = 1e-5
no_phase_soln = all([utils.approx_equal(stats['sum'], 0, tol),
utils.approx_equal(stats['min'], 0, tol),
utils.approx_equal(stats['max'], 0, tol)])
if no_phase_soln and not no_amp_soln:
return ' (amplitude only)'
if no_amp_soln and not no_phase_soln:
return ' (phase only)'
return ''
[docs] def flags_for_result(self, result, context):
ms = context.observing_run.get_ms(result.inputs['vis'])
summaries = result.summaries
by_intent = self.flags_by_intent(ms, summaries)
by_spw = self.flags_by_science_spws(ms, summaries)
return {ms.basename: utils.dict_merge(by_intent, by_spw)}
[docs] def flags_by_intent(self, ms, summaries):
# create a dictionary of scans per observing intent, eg. 'PHASE':[1,2,7]
intent_scans = {}
for intent in ('BANDPASS', 'PHASE', 'AMPLITUDE', 'CHECK', 'TARGET'):
# convert IDs to strings as they're used as summary dictionary keys
intent_scans[intent] = [str(s.id) for s in ms.scans
if intent in s.intents]
# while we're looping, get the total flagged by looking in all scans
intent_scans['TOTAL'] = [str(s.id) for s in ms.scans]
total = collections.defaultdict(dict)
previous_summary = None
for summary in summaries:
for intent, scan_ids in intent_scans.items():
flagcount = 0
totalcount = 0
for i in scan_ids:
# workaround for KeyError exception when summary
# dictionary doesn't contain the scan
if i not in summary['scan']:
continue
flagcount += int(summary['scan'][i]['flagged'])
totalcount += int(summary['scan'][i]['total'])
if previous_summary:
flagcount -= int(previous_summary['scan'][i]['flagged'])
ft = FlagTotal(flagcount, totalcount)
total[summary['name']][intent] = ft
previous_summary = summary
return total
[docs] def flags_by_science_spws(self, ms, summaries):
science_spws = ms.get_spectral_windows(science_windows_only=True)
total = collections.defaultdict(dict)
previous_summary = None
for summary in summaries:
flagcount = 0
totalcount = 0
for spw in science_spws:
spw_id = str(spw.id)
flagcount += int(summary['spw'][spw_id]['flagged'])
totalcount += int(summary['spw'][spw_id]['total'])
if previous_summary:
flagcount -= int(previous_summary['spw'][spw_id]['flagged'])
ft = FlagTotal(flagcount, totalcount)
total[summary['name']]['SCIENCE SPWS'] = ft
previous_summary = summary
return total
[docs]class ApplycalAmpVsFreqPlotRenderer(basetemplates.JsonPlotRenderer):
def __init__(self, context, result, plots):
vis = utils.get_vis_from_plots(plots)
title = 'Calibrated amplitude vs frequency for %s' % vis
outfile = filenamer.sanitize('amp_vs_freq-%s.html' % vis)
super(ApplycalAmpVsFreqPlotRenderer, self).__init__(
'generic_x_vs_y_field_spw_ant_detail_plots.mako', context,
result, plots, title, outfile)
[docs]class ApplycalPhaseVsFreqPlotRenderer(basetemplates.JsonPlotRenderer):
def __init__(self, context, result, plots):
vis = utils.get_vis_from_plots(plots)
title = 'Calibrated phase vs frequency for %s' % vis
outfile = filenamer.sanitize('phase_vs_freq-%s.html' % vis)
super(ApplycalPhaseVsFreqPlotRenderer, self).__init__(
'generic_x_vs_y_field_spw_ant_detail_plots.mako', context,
result, plots, title, outfile)
[docs]class ApplycalAmpVsFreqSciencePlotRenderer(basetemplates.JsonPlotRenderer):
def __init__(self, context, result, plots):
vis = utils.get_vis_from_plots(plots)
title = 'Calibrated amplitude vs frequency for %s' % vis
outfile = filenamer.sanitize('science_amp_vs_freq-%s.html' % vis)
super(ApplycalAmpVsFreqSciencePlotRenderer, self).__init__(
'generic_x_vs_y_spw_field_detail_plots.mako', context,
result, plots, title, outfile)
[docs]class ApplycalAmpVsUVSciencePlotRenderer(basetemplates.JsonPlotRenderer):
def __init__(self, context, result, plots):
vis = utils.get_vis_from_plots(plots)
title = 'Calibrated amplitude vs UV distance for %s' % vis
outfile = filenamer.sanitize('science_amp_vs_uv-%s.html' % vis)
super(ApplycalAmpVsUVSciencePlotRenderer, self).__init__(
'generic_x_vs_y_spw_field_detail_plots.mako', context,
result, plots, title, outfile)
[docs]class ApplycalAmpVsUVPlotRenderer(basetemplates.JsonPlotRenderer):
def __init__(self, context, result, plots):
vis = utils.get_vis_from_plots(plots)
title = 'Calibrated amplitude vs UV distance for %s' % vis
outfile = filenamer.sanitize('amp_vs_uv-%s.html' % vis)
super(ApplycalAmpVsUVPlotRenderer, self).__init__(
'generic_x_vs_y_field_spw_ant_detail_plots.mako', context,
result, plots, title, outfile)
[docs]class ApplycalPhaseVsUVPlotRenderer(basetemplates.JsonPlotRenderer):
def __init__(self, context, result, plots):
vis = utils.get_vis_from_plots(plots)
title = 'Calibrated phase vs UV distance for %s' % vis
outfile = filenamer.sanitize('phase_vs_uv-%s.html' % vis)
super(ApplycalPhaseVsUVPlotRenderer, self).__init__(
'generic_x_vs_y_spw_ant_plots.mako', context,
result, plots, title, outfile)
[docs]class ApplycalAmpVsTimePlotRenderer(basetemplates.JsonPlotRenderer):
def __init__(self, context, result, plots):
vis = utils.get_vis_from_plots(plots)
title = 'Calibrated amplitude vs times for %s' % vis
outfile = filenamer.sanitize('amp_vs_time-%s.html' % vis)
super(ApplycalAmpVsTimePlotRenderer, self).__init__(
'generic_x_vs_y_spw_ant_plots.mako', context,
result, plots, title, outfile)
[docs]class ApplycalPhaseVsTimePlotRenderer(basetemplates.JsonPlotRenderer):
def __init__(self, context, result, plots):
vis = utils.get_vis_from_plots(plots)
title = 'Calibrated phase vs times for %s' % vis
outfile = filenamer.sanitize('phase_vs_time-%s.html' % vis)
super(ApplycalPhaseVsTimePlotRenderer, self).__init__(
'generic_x_vs_y_field_spw_ant_detail_plots.mako', context,
result, plots, title, outfile)
def _get_data_selection_for_plot(context, result, intent):
"""
Inspect a result, returning a CalTo that matches the data selection of the
applied calibration.
Background: we don't want to create plots for an entire MS, only the data
selection of interest. Rather than calculate and explicitly pass in the
data selection of interest, this function calculates the data selection of
interest by inspecting the results and extracting the data selection that
the calibration is applied to.
:param context: pipeline Context
:param result: a Result with an .applied property containing CalApplications
:param intent: pipeline intent
:return:
"""
spw = _get_calapp_arg(result, 'spw')
field = _get_calapp_arg(result, 'field')
antenna = _get_calapp_arg(result, 'antenna')
intent = ','.join(intent).upper()
vis = {calapp.vis for calapp in result.applied}
assert (len(vis) is 1)
vis = vis.pop()
wanted = set(intent.split(','))
fields_with_intent = set()
for f in context.observing_run.get_ms(vis).get_fields(field):
intersection = f.intents.intersection(wanted)
if not intersection:
continue
fields_with_intent.add(f.name)
field = ','.join(fields_with_intent)
calto = callibrary.CalTo(vis, field, spw, antenna, intent)
return calto, intent
def _get_calapp_arg(result, arg):
s = set()
for calapp in result.applied:
s.update(utils.safe_split(getattr(calapp, arg)))
return ','.join(s)