Source code for pipeline.hif.tasks.tclean.display

import collections
import copy
import os

import matplotlib

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.renderer.logger as logger
from pipeline.h.tasks.common.displays import sky as sky
from pipeline.infrastructure import casa_tools
from .plot_spectra import plot_spectra

LOG = infrastructure.get_logger(__name__)

# class used to transfer image statistics through to plotting routines
ImageStats = collections.namedtuple('ImageStats', 'rms max')


[docs]class CleanSummary(object): def __init__(self, context, result, image_stats): self.context = context self.result = result self.image_stats = image_stats
[docs] def plot(self): stage_dir = os.path.join(self.context.report_dir, 'stage%d' % self.result.stage_number) if not os.path.exists(stage_dir): os.mkdir(stage_dir) LOG.trace('Plotting') plot_wrappers = [] # this class can handle a list of results from hif_cleanlist, or a single # result from hif_clean if we make the single result from the latter # the member of a list if hasattr(self.result, 'results'): results = self.result.results else: results = [self.result] for r in results: if r.empty(): continue extension = '.tt0' if r.multiterm else '' # psf map plot_wrappers.append(sky.SkyDisplay().plot(self.context, r.psf + extension, reportdir=stage_dir, intent=r.intent, collapseFunction='mean')) # flux map plot_wrappers.append(sky.SkyDisplay().plot(self.context, r.flux + extension, reportdir=stage_dir, intent=r.intent, collapseFunction='mean')) for i, iteration in [(k, r.iterations[k]) for k in sorted(r.iterations)]: # process image for this iteration if 'image' in iteration: collapse_function = 'max' if (('cube' in iteration.get('image', '')) or ('repBW' in iteration.get('image', ''))) else 'mean' # PB corrected image_path = iteration['image'].replace('.image', '.image%s' % (extension)) plot_wrappers.append( sky.SkyDisplay().plot(self.context, image_path, reportdir=stage_dir, intent=r.intent, collapseFunction=collapse_function)) # Non PB corrected image_path = image_path.replace('.pbcor', '') # CAS-8847: For the non-pbcor MOM8 image displayed in the weblog set the color range to: +1 # sigmaCube to max([peakCube,+8sigmaCube]). The pbcor MOM8 images are ok as is for now. extra_args = {} if collapse_function == 'max': if image_path not in self.image_stats: LOG.trace('No cached image statistics found for {!s}'.format(image_path)) with casa_tools.ImageReader(image_path) as image: stats = image.statistics(robust=False) image_rms = stats.get('rms')[0] image_max = stats.get('max')[0] self.image_stats[image_path] = ImageStats(rms=image_rms, max=image_max) image_stats = self.image_stats[image_path] image_rms = image_stats.rms image_max = image_stats.max extra_args = { 'vmin': image_rms, 'vmax': max([image_max, 8*image_rms]) } plot_wrappers.append( sky.SkyDisplay().plot(self.context, image_path, reportdir=stage_dir, intent=r.intent, collapseFunction=collapse_function, **extra_args)) # residual for this iteration plot_wrappers.append( sky.SkyDisplay().plot(self.context, iteration['residual'] + extension, reportdir=stage_dir, intent=r.intent)) # model for this iteration (currently only last but allow for others in future) if 'model' in iteration and os.path.exists(iteration['model'] + extension): plot_wrappers.append( sky.SkyDisplay().plot(self.context, iteration['model'] + extension, reportdir=stage_dir, intent=r.intent, **{'cmap': copy.deepcopy(matplotlib.cm.seismic)})) # MOM0_FC for this iteration (currently only last but allow for others in future). if 'mom0_fc' in iteration and os.path.exists(iteration['mom0_fc'] + extension): plot_wrappers.append( sky.SkyDisplay().plot(self.context, iteration['mom0_fc'] + extension, reportdir=stage_dir, intent=r.intent)) # MOM8_FC for this iteration (currently only last but allow for others in future). if 'mom8_fc' in iteration and os.path.exists(iteration['mom8_fc'] + extension): # PIPE-197: For MOM8_FC image displayed in the weblog set the color range # from (median-MAD) to 10 * sigma, where sigma is from the annulus minus cleanmask # masked mom8_fc image and median, MAD are from the unmasked mom8_fc image if iteration['mom8_fc_cube_sigma'] is not None: extra_args = { 'vmin': iteration['mom8_fc_image_median_all'] - iteration['mom8_fc_image_mad'], 'vmax': 10 * iteration['mom8_fc_cube_sigma'], 'mom8_fc_peak_snr': iteration['mom8_fc_peak_snr'] } # in case min >= max, set min=image_min and max=image_max if extra_args['vmin'] >= extra_args['vmax']: extra_args['vmin'] = iteration['mom8_fc_image_min'] extra_args['vmax'] = iteration['mom8_fc_image_max'] else: extra_args = {} plot_wrappers.append( sky.SkyDisplay().plot(self.context, iteration['mom8_fc'] + extension, reportdir=stage_dir, intent=r.intent, **extra_args)) # cleanmask - not for iter 0 if i > 0: collapse_function = 'max' if (('cube' in iteration.get('cleanmask', '')) or ('repBW' in iteration.get('cleanmask', ''))) else 'mean' plot_wrappers.append( sky.SkyDisplay().plot(self.context, iteration.get('cleanmask', ''), reportdir=stage_dir, intent=r.intent, collapseFunction=collapse_function, **{'cmap': copy.deepcopy(matplotlib.cm.YlOrRd)})) # cube spectra for this iteration if ('cube' in iteration.get('image', '')) or ('repBW' in iteration.get('image', '')): imagename = r.image_robust_rms_and_spectra['nonpbcor_imagename'] with casa_tools.ImageReader(imagename) as image: miscinfo = image.miscinfo() parameters = {k: miscinfo[k] for k in ['spw', 'iter'] if k in miscinfo} parameters['field'] = '%s (%s)' % (miscinfo['field'], miscinfo['intent']) parameters['type'] = 'spectra' try: parameters['prefix'] = miscinfo['filnam01'] except: parameters['prefix'] = None virtual_spw = parameters['spw'] imaging_mss = [m for m in self.context.observing_run.measurement_sets if m.is_imaging_ms] if imaging_mss != []: ref_ms = imaging_mss[0] else: ref_ms = self.context.observing_run.measurement_sets[0] real_spw = self.context.observing_run.virtual2real_spw_id(virtual_spw, ref_ms) real_spw_obj = ref_ms.get_spectral_window(real_spw) if real_spw_obj.receiver is not None and real_spw_obj.freq_lo is not None: rec_info = {'type': real_spw_obj.receiver, 'LO1': real_spw_obj.freq_lo[0].str_to_precision(12)} else: LOG.warn('Could not determine receiver type. Assuming TSB.') rec_info = {'type': 'TSB', 'LO1': '0GHz'} plotfile = '%s.spectrum.png' % (os.path.join(stage_dir, os.path.basename(imagename))) plot_spectra(r.image_robust_rms_and_spectra, rec_info, plotfile) plot_wrappers.append(logger.Plot(plotfile, parameters=parameters)) return [p for p in plot_wrappers if p is not None]