Source code for pipeline.infrastructure.displays.pointing

import math
import os

import numpy
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter, MultipleLocator, AutoLocator

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.renderer.logger as logger
from pipeline.domain.datatable import DataTableImpl as DataTable
from pipeline.domain.datatable import OnlineFlagIndex
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure.displays.plotstyle import casa5style_plot

LOG = infrastructure.get_logger(__name__)

RArotation = 90
DECrotation = 0

DPISummary = 90

dsyb = '$^\circ$'
hsyb = ':'
msyb = ':'


[docs]def Deg2HMS(x, prec=0): """ Converts an angle in degree to hour angle and returns a list of strings of hour, minute, and second values in a specified precision, e.g., ['01', '02', '23.4'] for '01:02:23.4' """ # Transform degree to HHMMSS.sss format xx = x % 360 cqa = casa_tools.quanta angle = cqa.angle(cqa.quantity(xx, 'deg'), prec=prec, form=['time'])[0] return angle.split(':')
# NOTE: a parameter 'pos' # HHMM* and DDMM* functions are used to set axis formatter of matplotlib plots. # The functions will be turned into matplotlib.ticker.FuncFormatter. # The function should take two inputs, a tick value x and position pos. # see also: https://matplotlib.org/3.3.0/api/ticker_api.html#matplotlib.ticker.FuncFormatter
[docs]def HHMM(x, pos=None): # HHMM format (h, m, s) = Deg2HMS(x, prec=6) return '%s%s%s' % (h, hsyb, m)
def __format_hms(x, prec=0): (h, m, s) = Deg2HMS(x, prec) return '%s%s%s%s%s' % (h, hsyb, m, msyb, s)
[docs]def HHMMSS(x, pos=None): # HHMMSS format return __format_hms(x, prec=6)
[docs]def HHMMSSs(x, pos=None): # HHMMSS.s format return __format_hms(x, prec=7)
[docs]def HHMMSSss(x, pos=None): # HHMMSS.ss format return __format_hms(x, prec=8)
[docs]def HHMMSSsss(x, pos=None): # HHMMSS.sss format return __format_hms(x, prec=9)
[docs]def Deg2DMS(x, prec=0): """ Converts an angle in degree to dms angle (ddmmss.s) and returns a list of strings of degree, arcminute, and arcsecond values in a specified precision, e.g., ['+01', '02', '23.4'] for '+01.02.23.4'. Note dgree string is always associated with a sign. """ # Transform degree to +ddmmss.ss format xxx = (x + 90) % 180 - 90 xx = abs(xxx) sign = '-' if xxx < 0 else '+' cqa = casa_tools.quanta dms_angle = cqa.angle(cqa.quantity(xx, 'deg'), prec=prec)[0] seg = dms_angle.split('.') assert len(seg) < 5 and len(seg) > 0 # force degree in %02d format and add sign. qa.angle always retrun positive angle seg[0] = ( '%s%02d' % (sign, int(seg[0][1:])) ) if len(seg) == 4: return (seg[0], seg[1], str('.').join(seg[2:])) else: return seg
[docs]def DDMM(x, pos=None): # +DDMM format (d, m, s) = Deg2DMS(x, prec=6) return '%s%s%s\'' % (d, dsyb, m)
def __format_dms(x, prec=0): (d, m, s) = Deg2DMS(x, prec) # format desimal part of arcsec value separately xx = s.split('.') s = xx[0] ss = '' if len(xx) ==1 else '.%s' % xx[1] return '%s%s%s\'%s\"%s' % (d, dsyb, m, s, ss)
[docs]def DDMMSS(x, pos=None): # +DDMMSS format return __format_dms(x, prec=6)
[docs]def DDMMSSs(x, pos=None): # +DDMMSS.s format return __format_dms(x, prec=7)
[docs]def DDMMSSss(x, pos=None): # +DDMMSS.ss format return __format_dms(x, prec=8)
[docs]def XYlabel(span, direction_reference, ofs_coord=False): if direction_reference.upper() == 'GALACTIC': return GLGBlabel(span) else: return RADEClabel(span, ofs_coord)
[docs]def GLGBlabel(span): """ return (GLlocator, GBlocator, GLformatter, GBformatter) for Galactic coordinate """ # RAtick = [15.0, 5.0, 2.5, 1.25, 1/2.0, 1/4.0, 1/12.0, 1/24.0, 1/48.0, 1/120.0, 1/240.0, 1/480.0, 1/1200.0, # 1/2400.0, 1/4800.0, 1/12000.0, 1/24000.0, 1/48000.0, -1.0] XYtick = [20.0, 10.0, 5.0, 2.0, 1.0, 1/3.0, 1/6.0, 1/12.0, 1/30.0, 1/60.0, 1/180.0, 1/360.0, 1/720.0, 1/1800.0, 1/3600.0, 1/7200.0, 1/18000.0, 1/36000.0, -1.0] #for RAt in RAtick: # if span > (RAt * 3.0) and RAt > 0: # RAlocator = MultipleLocator(RAt) # break #if RAt < 0: RAlocator = MultipleLocator(1/96000.) #if RAt < 0: RAlocator = AutoLocator() for t in XYtick: if span > (t * 3.0) and t > 0: GLlocator = MultipleLocator(t) GBlocator = MultipleLocator(t) break #if DECt < 0: DEClocator = MultipleLocator(1/72000.0) if t < 0: GLlocator = AutoLocator() GBlocator = AutoLocator() if span < 0.0001: GLformatter=FuncFormatter(DDMMSSss) GBformatter=FuncFormatter(DDMMSSss) elif span < 0.001: GLformatter=FuncFormatter(DDMMSSs) GBformatter=FuncFormatter(DDMMSSs) elif span < 0.01: GLformatter=FuncFormatter(DDMMSS) GBformatter=FuncFormatter(DDMMSS) elif span < 1.0: GLformatter=FuncFormatter(DDMMSS) #GBformatter=FuncFormatter(DDMM) GBformatter=FuncFormatter(DDMMSS) else: GLformatter=FuncFormatter(DDMM) GBformatter=FuncFormatter(DDMM) return (GLlocator, GBlocator, GLformatter, GBformatter)
[docs]def RADEClabel(span, ofs_coord): """ return (RAlocator, DEClocator, RAformatter, DECformatter) """ RAtick = [15.0, 5.0, 2.5, 1.25, 1/2.0, 1/4.0, 1/12.0, 1/24.0, 1/48.0, 1/120.0, 1/240.0, 1/480.0, 1/1200.0, 1/2400.0, 1/4800.0, 1/12000.0, 1/24000.0, 1/48000.0, -1.0] DECtick = [20.0, 10.0, 5.0, 2.0, 1.0, 1/3.0, 1/6.0, 1/12.0, 1/30.0, 1/60.0, 1/180.0, 1/360.0, 1/720.0, 1/1800.0, 1/3600.0, 1/7200.0, 1/18000.0, 1/36000.0, -1.0] for RAt in RAtick: if span > (RAt * 3.0) and RAt > 0: RAlocator = MultipleLocator(RAt) break #if RAt < 0: RAlocator = MultipleLocator(1/96000.) if RAt < 0: RAlocator = AutoLocator() for DECt in DECtick: if span > (DECt * 3.0) and DECt > 0: DEClocator = MultipleLocator(DECt) break #if DECt < 0: DEClocator = MultipleLocator(1/72000.0) if DECt < 0: DEClocator = AutoLocator() if span < 0.0001: if ofs_coord: RAformatter=FuncFormatter(DDMMSSss) else: RAformatter=FuncFormatter(HHMMSSsss) DECformatter=FuncFormatter(DDMMSSss) elif span < 0.001: if ofs_coord: RAformatter=FuncFormatter(DDMMSSs) else: RAformatter=FuncFormatter(HHMMSSss) DECformatter=FuncFormatter(DDMMSSs) elif span < 0.01: if ofs_coord: RAformatter=FuncFormatter(DDMMSS) else: RAformatter=FuncFormatter(HHMMSSs) DECformatter=FuncFormatter(DDMMSS) elif span < 1.0: if ofs_coord: RAformatter=FuncFormatter(DDMMSS) else: RAformatter=FuncFormatter(HHMMSS) #DECformatter=FuncFormatter(DDMM) DECformatter=FuncFormatter(DDMMSS) else: if ofs_coord: RAformatter=FuncFormatter(DDMM) else: RAformatter=FuncFormatter(HHMM) DECformatter=FuncFormatter(DDMM) return (RAlocator, DEClocator, RAformatter, DECformatter)
[docs]class MapAxesManagerBase(object): @property def direction_reference(self): return self._direction_reference @direction_reference.setter def direction_reference(self, value): if isinstance(value, str): self._direction_reference = value @property def ofs_coord(self): return self._ofs_coord @ofs_coord.setter def ofs_coord(self, value): if isinstance(value, bool): self._ofs_coord = value def __init__(self): self._direction_reference = None self._ofs_coord = None
[docs] def get_axes_labels(self): # default label is RA/Dec xlabel = 'RA' ylabel = 'Dec' if isinstance(self.direction_reference, str): if self.direction_reference in ['J2000', 'ICRS']: if self.ofs_coord: xlabel = 'Offset-RA ({0})'.format(self.direction_reference) ylabel = 'Offset-Dec ({0})'.format(self.direction_reference) else: xlabel = 'RA ({0})'.format(self.direction_reference) ylabel = 'Dec ({0})'.format(self.direction_reference) elif self.direction_reference.upper() == 'GALACTIC': xlabel = 'GL' ylabel = 'GB' return xlabel, ylabel
[docs]class PointingAxesManager(MapAxesManagerBase): MATPLOTLIB_FIGURE_ID = 9005 @property def direction_reference(self): return self._direction_reference @direction_reference.setter def direction_reference(self, value): if isinstance(value, str): self._direction_reference = value @property def ofs_coord(self): return self._ofs_coord @ofs_coord.setter def ofs_coord(self, value): if isinstance(value, bool): self._ofs_coord = value def __init__(self): self._axes = None self.is_initialized = False self._direction_reference = None self._ofs_coord = None
[docs] def init_axes(self, xlocator, ylocator, xformatter, yformatter, xrotation, yrotation, aspect, xlim=None, ylim=None, reset=False): if self._axes is None: self._axes = self.__axes() if xlim is not None: self._axes.set_xlim(xlim) if ylim is not None: self._axes.set_ylim(ylim) if self.is_initialized == False or reset: # 2008/9/20 DEC Effect self._axes.set_aspect(aspect) self._axes.xaxis.set_major_formatter(xformatter) self._axes.yaxis.set_major_formatter(yformatter) self._axes.xaxis.set_major_locator(xlocator) self._axes.yaxis.set_major_locator(ylocator) xlabels = self._axes.get_xticklabels() plt.setp(xlabels, 'rotation', xrotation, fontsize=8) ylabels = self._axes.get_yticklabels() plt.setp(ylabels, 'rotation', yrotation, fontsize=8)
@property def axes(self): if self._axes is None: self._axes = self.__axes() return self._axes def __axes(self): a = plt.axes([0.15, 0.2, 0.7, 0.7]) xlabel, ylabel = self.get_axes_labels() plt.xlabel(xlabel) plt.ylabel(ylabel) plt.title('') return a
[docs]def draw_beam(axes, r, aspect, x_base, y_base, offset=1.0): xy = numpy.array([[r * (math.sin(t * 0.13) + offset) * aspect + x_base, r * (math.cos(t * 0.13) + offset) + y_base] for t in range(50)]) plt.gcf().sca(axes) line = plt.plot(xy[:, 0], xy[:, 1], 'r-') return line[0]
[docs]def draw_pointing(axes_manager, RA, DEC, FLAG=None, plotfile=None, connect=True, circle=[], ObsPattern=False, plotpolicy='ignore'): span = max(max(RA) - min(RA), max(DEC) - min(DEC)) xmax = min(RA) - span / 10.0 xmin = max(RA) + span / 10.0 ymax = max(DEC) + span / 10.0 ymin = min(DEC) - span / 10.0 (RAlocator, DEClocator, RAformatter, DECformatter) = XYlabel(span, axes_manager.direction_reference, ofs_coord=axes_manager.ofs_coord) Aspect = 1.0 / math.cos(DEC[0] / 180.0 * 3.141592653) # Plotting routine if connect is True: Mark = 'g-o' else: Mark = 'bo' axes_manager.init_axes(RAlocator, DEClocator, RAformatter, DECformatter, RArotation, DECrotation, Aspect, xlim=(xmin, xmax), ylim=(ymin, ymax)) a = axes_manager.axes if ObsPattern == False: a.title.set_text('Telescope Pointing on the Sky') else: a.title.set_text('Telescope Pointing on the Sky\nPointing Pattern = %s' % ObsPattern) plot_objects = [] if plotpolicy == 'plot': # Original plot_objects.extend( plt.plot(RA, DEC, Mark, markersize=2, markeredgecolor='b', markerfacecolor='b') ) elif plotpolicy == 'ignore': # Ignore Flagged Data filter = FLAG == 1 plot_objects.extend( plt.plot(RA[filter], DEC[filter], Mark, markersize=2, markeredgecolor='b', markerfacecolor='b') ) elif plotpolicy == 'greyed': # Change Color if connect is True: plot_objects.extend(plt.plot(RA, DEC, 'g-')) filter = FLAG == 1 plot_objects.extend( plt.plot(RA[filter], DEC[filter], 'o', markersize=2, markeredgecolor='b', markerfacecolor='b') ) filter = FLAG == 0 if numpy.any(filter == True): plot_objects.extend( plt.plot(RA[filter], DEC[filter], 'o', markersize=2, markeredgecolor='grey', markerfacecolor='grey') ) # plot starting position with beam and end position if len(circle) != 0: plot_objects.append( draw_beam(a, circle[0], Aspect, RA[0], DEC[0], offset=0.0) ) Mark = 'ro' plot_objects.extend( plt.plot(RA[-1], DEC[-1], Mark, markersize=4, markeredgecolor='r', markerfacecolor='r') ) plt.axis([xmin, xmax, ymin, ymax]) if plotfile is not None: plt.savefig(plotfile, format='png', dpi=DPISummary) for obj in plot_objects: obj.remove()
[docs]class SingleDishPointingChart(object): def __init__(self, context, ms, antenna, target_field_id=None, reference_field_id=None, target_only=True, ofs_coord=False): self.context = context self.ms = ms self.antenna = antenna self.target_field = self.__get_field(target_field_id) self.reference_field = self.__get_field(reference_field_id) self.target_only = target_only self.ofs_coord = ofs_coord self.figfile = self._get_figfile() self.axes_manager = PointingAxesManager() def __get_field(self, field_id): if field_id is not None: fields = self.ms.get_fields(field_id) assert len(fields) == 1 field = fields[0] LOG.debug('found field domain for %s'%(field_id)) return field else: return None @casa5style_plot def plot(self, revise_plot=False): if revise_plot == False and os.path.exists(self.figfile): return self._get_plot_object() ms = self.ms antenna_id = self.antenna.id datatable_name = os.path.join(self.context.observing_run.ms_datatable_name, ms.basename) datatable = DataTable() datatable.importdata(datatable_name, minimal=False, readonly=True) target_spws = ms.get_spectral_windows(science_windows_only=True) # Search for the first available SPW, antenna combination # observing_pattern is None for invalid combination. spw_id = None for s in target_spws: field_patterns = list(ms.observing_pattern[antenna_id][s.id].values()) if field_patterns.count(None) < len(field_patterns): # at least one valid field exists. spw_id = s.id break if spw_id is None: LOG.info('No data with antenna=%d and spw=%s found in %s' % (antenna_id, str(target_spws), ms.basename)) LOG.info('Skipping pointing plot') return None else: LOG.debug('Generate pointing plot using antenna=%d and spw=%d of %s' % (antenna_id, spw_id, ms.basename)) beam_size = casa_tools.quanta.convert(ms.beam_sizes[antenna_id][spw_id], 'deg') beam_size_in_deg = casa_tools.quanta.getvalue(beam_size) obs_pattern = ms.observing_pattern[antenna_id][spw_id] antenna_ids = datatable.getcol('ANTENNA') spw_ids = datatable.getcol('IF') if self.target_field is None or self.reference_field is None: # plot pointings regardless of field if self.target_only == True: srctypes = datatable.getcol('SRCTYPE') func = lambda j, k, l: j == antenna_id and k == spw_id and l == 0 vfunc = numpy.vectorize(func) dt_rows = vfunc(antenna_ids, spw_ids, srctypes) else: func = lambda j, k: j == antenna_id and k == spw_id vfunc = numpy.vectorize(func) dt_rows = vfunc(antenna_ids, spw_ids) else: field_ids = datatable.getcol('FIELD_ID') if self.target_only == True: srctypes = datatable.getcol('SRCTYPE') field_id = [self.target_field.id] func = lambda f, j, k, l: f in field_id and j == antenna_id and k == spw_id and l == 0 vfunc = numpy.vectorize(func) dt_rows = vfunc(field_ids, antenna_ids, spw_ids, srctypes) else: field_id = [self.target_field.id, self.reference_field.id] func = lambda f, j, k: f in field_id and j == antenna_id and k == spw_id vfunc = numpy.vectorize(func) dt_rows = vfunc(field_ids, antenna_ids, spw_ids) if self.ofs_coord == True: racol = 'OFS_RA' deccol = 'OFS_DEC' else: racol = 'RA' deccol = 'DEC' LOG.debug('column names: {}, {}'.format(racol, deccol)) if racol not in datatable.colnames() or deccol not in datatable.colnames(): return None RA = datatable.getcol(racol)[dt_rows] if len(RA) == 0: # no row found LOG.warn('No data found with antenna=%d, spw=%d, and field=%s in %s.' % (antenna_id, spw_id, str(field_id), ms.basename)) LOG.warn('Skipping pointing plots.') return None DEC = datatable.getcol(deccol)[dt_rows] FLAG = numpy.zeros(len(RA), dtype=int) rows = numpy.where(dt_rows == True)[0] assert len(RA) == len(rows) for (i, row) in enumerate(rows): pflags = datatable.getcell('FLAG_PERMANENT', row) # use flag for pol 0 FLAG[i] = pflags[0][OnlineFlagIndex] self.axes_manager.direction_reference = datatable.direction_ref self.axes_manager.ofs_coord = self.ofs_coord plt.clf() draw_pointing(self.axes_manager, RA, DEC, FLAG, self.figfile, circle=[0.5*beam_size_in_deg], ObsPattern=obs_pattern, plotpolicy='greyed') plt.close() return self._get_plot_object() def _get_figfile(self): session_part = self.ms.session ms_part = self.ms.basename antenna_part = self.antenna.name if self.target_field is None or self.reference_field is None: identifier = antenna_part else: clean_name = self.target_field.clean_name identifier = antenna_part + '.%s'%(clean_name) if self.target_only == True: if self.ofs_coord == True: basename = 'offset_target_pointing.%s'%(identifier) else: basename = 'target_pointing.%s'%(identifier) else: basename = 'whole_pointing.%s'%(identifier) figfile = os.path.join(self.context.report_dir, 'session%s' % session_part, ms_part, '%s.png'%(basename)) return figfile def _get_plot_object(self): intent = 'target' if self.target_only == True else 'target,reference' if self.target_field is None or self.reference_field is None: field_name = '' else: if self.target_only or self.target_field.name == self.reference_field.name: field_name = self.target_field.name else: field_name = self.target_field.name + ',' + self.reference_field.name if self.ofs_coord == True: xaxis = 'Offset R.A.' yaxis = 'Offset Declination' else: xaxis = 'R.A.' yaxis = 'Declination' return logger.Plot(self.figfile, x_axis=xaxis, y_axis=yaxis, parameters={'vis': self.ms.basename, 'antenna': self.antenna.name, 'field': field_name, 'intent': intent})