import argparse
import collections
import glob
import itertools
import math
import os
import sys
from operator import sub
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation, ImageMagickWriter
import pipeline.domain.datatable as datatable
import pipeline.infrastructure.logging as logging
from pipeline.infrastructure import casa_tools
LOG = logging.get_logger(__name__)
MetaDataSet = collections.namedtuple(
'MetaDataSet',
['timestamp', 'dtrow', 'field', 'antenna', 'ra', 'dec', 'srctype', 'pflag'])
[docs]def distance(x0, y0, x1, y1):
"""
Compute distance between two points (x0, y0) and (x1, y1).
:param x0: x-coordinate value for point 0
:type x0: float
:param y0: y-coordinate value for point 0
:type y0: float
:param x1: x-coordinate value for point 1
:type x1: float
:param y1: y-coordinate value for point 1
:type y1: float
:return: distance between two points
:rtype: float
"""
_dx = x1 - x0
_dy = y1 - y0
return np.hypot(_dx, _dy)
[docs]def read_readonly_data(table):
timestamp = table.getcol('TIME')
dtrow = np.arange(len(timestamp))
ra = table.getcol('OFS_RA')
dec = table.getcol('OFS_DEC')
srctype = table.getcol('SRCTYPE')
antenna = table.getcol('ANTENNA')
field = table.getcol('FIELD_ID')
return timestamp, dtrow, ra, dec, srctype, antenna, field
[docs]def read_readwrite_data(table):
pflags = table.getcol('FLAG_PERMANENT')
pflag = pflags[0, datatable.OnlineFlagIndex, :]
return pflag
[docs]def read_datatable(datatable):
"""
extract necessary data from datatable instance.
:param datatable: datatable instance
:type datatable: DataTableImpl
:return: metadata
:rtype: MetaDataSet
"""
timestamp, dtrow, ra, dec, srctype, antenna, field = read_readonly_data(datatable)
pflag = read_readwrite_data(datatable)
metadata = MetaDataSet(
timestamp=timestamp,
dtrow=dtrow,
field=field,
antenna=antenna,
ra=ra, dec=dec,
srctype=srctype,
pflag=pflag)
return metadata
[docs]def from_context(context_dir):
"""
read DataTable located in the context directory.
NOTE: only one DataTable will be loaded for multi-EB run
:param context_dir: path to the pipeline context directory
:type context_dir: str
:return: metadata
:rtype: MetaDataSet
"""
datatable_dir = os.path.join(context_dir, 'MSDataTable.tbl')
rotable = glob.glob(f'{datatable_dir}/*.ms/RO')[0]
rwtable = glob.glob(f'{datatable_dir}/*.ms/RW')[0]
tb = casa_tools.table
tb.open(rotable)
try:
timestamp, dtrow, ra, dec, srctype, antenna, field = read_readonly_data(tb)
finally:
tb.close()
tb.open(rwtable)
try:
pflag = read_readwrite_data(tb)
finally:
tb.close()
metadata = MetaDataSet(
timestamp=timestamp,
dtrow=dtrow,
field=field,
antenna=antenna,
ra=ra, dec=dec,
srctype=srctype,
pflag=pflag)
return metadata
[docs]def get_science_target_fields(metadata):
"""
Get list of field ids for science targets.
:param metadata: metadata
:type metadata: MetaDataSet
:return: list of field ids for science targets
:rtype: ndarray
"""
return np.unique(metadata.field[metadata.srctype == 0])
[docs]def filter_data(metadata, field_id, antenna_id, onsource=True):
"""
Filter metadata.
:param metadata: input metadata
:type metadata: MetaDataSet
:param field_id: field id
:type field_id: int
:param antenna_id: antenna id
:type antenna_id: int
:param onsource: take ON_SOURCE data only, defaults to True
:type onsource: bool, optional
:raises RuntimeError: filter causes empty result
:return: filtered metadata
:rtype: MetaDataSet
"""
mask = np.logical_and(
metadata.antenna == antenna_id,
metadata.field == field_id
)
if onsource == True:
mask = np.logical_and(mask, metadata.srctype == 0)
srctype = 0
else:
srctype = None
metadata2 = MetaDataSet(
timestamp=metadata.timestamp[mask],
dtrow=metadata.dtrow[mask],
field=field_id,
antenna=antenna_id,
ra=metadata.ra[mask],
dec=metadata.dec[mask],
srctype=srctype,
pflag=metadata.pflag[mask]
)
if len(metadata2.timestamp) == 0:
raise RuntimeError('No data available for field ID {} antenna ID {} {}'.format(
field_id,
antenna_id,
'(ON_SOURCE)' if onsource else ''
))
return metadata2
[docs]def squeeze_data(metadata):
"""
Make timestamp in input metadata unique.
:param metadata: input metadata
:type metadata: MetaDataSet
:return: metadata without duplication of timestamp
:rtype: MetaDataSet
"""
utime, idx = np.unique(metadata.timestamp, return_index=True)
urow = metadata.dtrow[idx]
ura = metadata.ra[idx]
udec = metadata.dec[idx]
uflag = metadata.pflag[idx]
if isinstance(metadata.field, (int, np.int32, np.int64)):
ufield = metadata.field
else:
ufield = metadata.field[idx]
if isinstance(metadata.antenna, (int, np.int32, np.int64)):
uant = metadata.antenna
else:
uant = metadata.antenna[idx]
if isinstance(metadata.srctype, (int, np.int32, np.int64)) or metadata.srctype is None:
usrctype = metadata.srctype
else:
usrctype = metadata.srctype[idx]
metadata2 = MetaDataSet(
timestamp=utime,
dtrow=urow,
field=ufield,
antenna=uant,
ra=ura, dec=udec,
srctype=usrctype,
pflag=uflag
)
return metadata2
[docs]def find_time_gap(timestamp):
"""
Find time gap. Condition for gap is
- time interval > 3 * median(time interval) for small gap
- time gap > 3 * median(time gap) for large gap
:param timestamp: list of timestamp. no duplication. must be sorted in ascending order.
:type timestamp: ndarray
:return: list of small and large time gaps
:rtype: two-tuple of lists
"""
dt = timestamp[1:] - timestamp[:-1]
med = np.median(dt)
gsmall = np.where(dt > 3 * med)[0]
med2 = np.median(dt[gsmall])
glarge = np.where(dt > 3 * med2)[0]
return gsmall, glarge
[docs]def gap_gen(gaplist, length=None):
"""
Generate range of data (start and end indices) from
given gap list. Return values, s and e, can be used to
arr[s:e] to extract the data from the original array, arr.
:param gaplist: list of indices indicating gap
:type gaplist: list
:param length: total number of data, defaults to None
:type length: int, optional
:yield: start and end indices
:rtype: two tuple of integers
"""
n = -1 if length is None else length
if len(gaplist) == 0:
yield 0, n
else:
yield 0, gaplist[0] + 1
for i, j in zip(gaplist[:-1], gaplist[1:]):
yield i + 1, j + 1
yield gaplist[-1] + 1, n
[docs]def get_raster_distance(ra, dec, gaplist):
"""
Compute list of distances between raster rows.
Origin of the distance is the first raster row.
Representative position of each raster row is
its midpoint (mean position).
:param ra: list of RA
:type ra: ndarray
:param dec: list of Dec
:type dec: ndarray
:param gaplist: list of indices indicating gaps between raster rows
:type gaplist: list
:return: list of distances between raster rows
:rtype: ndarray
"""
x1 = ra[:gaplist[0] + 1].mean()
y1 = dec[:gaplist[0] + 1].mean()
distance_list = np.fromiter(
(distance(ra[s:e].mean(), dec[s:e].mean(), x1, y1) for s, e in gap_gen(gaplist)),
dtype=float)
return distance_list
[docs]def find_raster_gap(timestamp, ra, dec, time_gap=None):
"""
Find gaps between individual raster map. Returned list should be
used in combination with gap_gen. Here is an example to plot
RA/DEC data per raster map:
import maplotlib.pyplot as plt
gap = find_raster_gap(timestamp, ra, dec)
for s, e in gap_gen(gap):
plt.plot(ra[s:e], dec[s:e], '.')
:param timestamp: list of time stamp
:type timestamp: ndarray
:param ra: list of RA
:type ra: ndarray
:param dec: list of Dec
:type dec: ndarray
:param time_gap: list of index of time gaps, defaults to None
:type time_gap: ndarray, optional
:return: list of index indicating boundary between raster maps
:rtype: ndarray
"""
if time_gap is None:
timegap_small, _ = find_time_gap(timestamp)
else:
timegap_small = time_gap
distance_list = get_raster_distance(ra, dec, timegap_small)
delta_distance = distance_list[1:] - distance_list[:-1]
idx = np.where(delta_distance < 0)
raster_gap = timegap_small[idx]
return raster_gap
[docs]def flag_incomplete_raster(meta, raster_gap, nd_raster, nd_row):
"""
flag incomplete raster map
N: number of data per raster map
M: number of data per raster row
MN: median of N => typical number of data per raster map
MM: median of M => typical number of data per raster row
logic:
- if N[x] < MN + MM then flag whole data in raster map x
- if N[x] > MN + MM then flag whole data in raster map x and later
:param meta: metadata object
:type meta: MetaDataSet
:param raster_gap: gap list
:type raster_gap: list
:param nd_raster: typical number of data per raster map (MN)
:type nd_raster: int
:param nd_row: typical number of data per raster row (MM)
:type nd_row: int
:return: list of index for raster map
:rtype: list
"""
gap = gap_gen(raster_gap, len(meta.timestamp))
nd = np.asarray([e - s for s, e in gap])
assert nd_raster >= nd_row
upper_threshold = nd_raster + nd_row
lower_threshold = nd_raster - nd_row
# nd exceeds upper_threshold
test_upper = nd >= upper_threshold
idx = np.where(test_upper)[0]
if len(idx) > 0:
test_upper[idx[-1]:] = True
LOG.debug(f'test_upper={test_upper}')
# nd is less than lower_threshold
test_lower = nd <= lower_threshold
LOG.debug(f'test_lower={test_lower}')
idx = np.where(np.logical_or(test_upper, test_lower))[0]
return idx
[docs]def flag_worm_eaten_raster(meta, raster_gap, nd_row):
"""
flag raster map if number of continuous flagged data
exceeds upper limit given by nd_row
M: number of data per raster row
MM: median of M => typical number of data per raster row
L: maximum length of continuous flagged data
logic:
- if L[x] > MM then flag whole data in raster map x
:param meta: metadata object
:type meta: MetaDataSet
:param raster_gap: gap list
:type raster_gap: list
:param nd_row: typical number of data per raster row (MM)
:type nd_row: int
:return: list of index for raster map
:rtype: list
"""
gap = gap_gen(raster_gap, len(meta.timestamp))
# flag
# 1: valid, 0: invalid
flag_raster = [meta.pflag[s:e] for s, e in gap]
LOG.debug(f'Typical number of data per raster row: {nd_row}')
flag_continuous = [
np.fromiter(
map(sum, (f[i:i + nd_row] for i in range(len(f) - nd_row + 1))),
dtype=int
)
for f in flag_raster
]
min_count = np.fromiter(
(x.min() for x in flag_continuous),
dtype=int
)
LOG.debug(f'Minimum number of continuous valid data: {min_count}')
test = min_count == 0
LOG.debug(f'test={test}')
idx = np.where(test)[0]
return idx
[docs]def get_raster_flag_list(flagged1, flagged2, raster_gap, ndata):
"""
Merge flag result and convert raster id to list of data index.
:param flagged1: list of flagged raster id
:type flagged1: list
:param flagged2: list of flagged raster id
:type flagged2: list
:param raster_gap: list of gaps between raster maps
:type raster_gap: list
:param ndata: total number of data points
:type ndata: int
:return: list of data ids to be flagged
:rtype: ndarray
"""
flagged = set(flagged1).union(set(flagged2))
gap = list(gap_gen(raster_gap, ndata))
g = (range(*gap[i]) for i in flagged)
data_ids = np.fromiter(itertools.chain(*g), dtype=int)
return data_ids
[docs]def flag_raster_map(metadata):
"""
Return list of index to be flagged by flagging heuristics
for raster scan
:param meta: metadata
:type meta: MetaDataSet
:return: per-antenna list of index to be flagged
:rtype: list
"""
field_list = get_science_target_fields(metadata)
rows_per_field = [flag_raster_map_per_field(metadata, f) for f in field_list]
rows_per_antenna = zip(*rows_per_field)
rows_merged = list(map(np.concatenate, rows_per_antenna))
return rows_merged
[docs]def find_most_frequent(v):
"""
Return the most frequent number (mode) in v.
:param v: data
:type v: ndarray
:return: most frequent number (mode)
:rtype: int
"""
values, counts = np.unique(v, return_counts=True)
max_count = counts.max()
LOG.trace(f'max count: {max_count}')
modes = values[counts == max_count]
LOG.trace(f'modes: {modes}')
if len(modes) > 1:
mode = modes.max()
else:
mode = modes[0]
LOG.trace(f'mode = {mode}')
return mode
[docs]def flag_raster_map_per_field(metadata, field_id):
"""
Flag raster map based on two flagging heuristics for
given field id.
:param metadata: metadata
:type metadata: MetaDataSet
:param field_id: field id to process
:type field_id: int
:return: per-antenna list of data ids to be flagged
:rtype: list of ndarray
"""
# metadata per antenna (with duplication)
antenna_list = np.unique(metadata.antenna)
meta_per_ant_dup = [filter_data(metadata, field_id, a) for a in antenna_list]
# metadata per antenna (without duplication)
meta_per_ant = [squeeze_data(meta) for meta in meta_per_ant_dup]
ndata_per_ant = list(map(lambda x: len(x.timestamp), meta_per_ant))
# get time gap
time_gap_per_ant = [find_time_gap(m.timestamp)[0] for m in meta_per_ant]
LOG.trace('{} {}'.format(len(meta_per_ant), len(time_gap_per_ant)))
# get raster gap
raster_gap_per_ant = [
find_raster_gap(m.timestamp, m.ra, m.dec, gap)
for m, gap in zip(meta_per_ant, time_gap_per_ant)
]
# compute number of data per raster row
num_data_per_raster_row = [
[len(m.ra[s:e]) for s, e in gap_gen(gap)]
for m, gap in zip(meta_per_ant, time_gap_per_ant)
]
LOG.trace(num_data_per_raster_row)
nd_per_row_rep = find_most_frequent(
np.fromiter(itertools.chain(*num_data_per_raster_row), dtype=int)
)
LOG.debug('number of raster row: {}'.format(list(map(len, num_data_per_raster_row))))
LOG.debug(f'most frequent # of data per raster row: {nd_per_row_rep}')
# compute number of data per raster map
num_data_per_raster_map = [
[len(m.ra[s:e]) for s, e in gap_gen(gap)]
for m, gap in zip(meta_per_ant, raster_gap_per_ant)
]
LOG.trace(num_data_per_raster_map)
nd_per_raster_rep = find_most_frequent(
np.fromiter(itertools.chain(*num_data_per_raster_map), dtype=int)
)
LOG.debug('number of raster map: {}'.format(list(map(len, num_data_per_raster_map))))
LOG.debug(f'most frequent # of data per raster map: {nd_per_raster_rep}')
LOG.debug('nominal number of row per raster map: {}'.format(nd_per_raster_rep // nd_per_row_rep))
# flag incomplete raster map
flag_raster1 = [
flag_incomplete_raster(m, gap, nd_per_raster_rep, nd_per_row_rep)
for m, gap in zip(meta_per_ant, raster_gap_per_ant)
]
# flag raster map if it contains continuous flagged data
# whose length is larger than the number of data per raster row
flag_raster2 = [
flag_worm_eaten_raster(m, gap, nd_per_row_rep)
for m, gap in zip(meta_per_ant, raster_gap_per_ant)
]
# merge flag result and convert raster id to list of data index
flag_list = [
get_raster_flag_list(f1, f2, gap, n)
for f1, f2, gap, n in zip(flag_raster1, flag_raster2, raster_gap_per_ant, ndata_per_ant)
]
LOG.trace(flag_list)
# get timestamp list
time_list = [
set((m.timestamp[i] for i in f))
for f, m in zip(flag_list, meta_per_ant)
]
# convert timestamp list into row list
row_list = [
m.dtrow[[x in t for x in m.timestamp]]
for t, m in zip(time_list, meta_per_ant_dup)
]
return row_list
[docs]def get_aspect(ax):
"""
Compute aspect ratio of matplotlib figure.
:param ax: Axes object to examine
:type ax: Axes
:return: aspect ratio
:rtype: float
"""
# Total figure size
figW, figH = ax.get_figure().get_size_inches()
# Axis size on figure
_, _, w, h = ax.get_position().bounds
# Ratio of display units
disp_ratio = (figH * h) / (figW * w)
# Ratio of data units
# Negative over negative because of the order of subtraction
data_ratio = sub(*ax.get_ylim()) / sub(*ax.get_xlim())
return disp_ratio / data_ratio
[docs]def get_angle(dx, dy, aspect_ratio=1):
"""
Compute tangential angle taking into account aspect ratio.
:param dx: length along x-axis
:type dx: float
:param dy: length along y-axis
:type dy: float
:param aspect_ratio: aspect_ratio, defaults to 1
:type aspect_ratio: float, optional
:return: tangential angle
:rtype: float
"""
offset = 30
theta = math.degrees(math.atan2(dy * aspect_ratio, dx))
return offset + theta
[docs]def anim_gen(ra, dec, idx_generator, dist_list, cmap):
"""
Generator for generate_animation.
:param ra: list of RA
:type ra: ndarray
:param dec: list of Dec
:type dec: ndarray
:param idx_generator: generator yielding start and end indices indicating raster row
:type idx_generator: generator
:param dist_list: distance list
:type dist_list: ndarray
:param cmap: color map
:type cmap: ListedColorMap
:yield: position, color, and boolean flag to clear existing plot
:rtype: tuple
"""
dist_prev = 0
cidx = 0
raster_flag = False
for idx, dist in zip(idx_generator, dist_list):
print('{} - {} = {}'.format(dist, dist_prev, dist - dist_prev))
if dist - dist_prev < 0:
print('updating cidx {}'.format(cidx))
cidx = (cidx + 1) % cmap.N
raster_flag = True
color = cmap(cidx)
yield ra[idx[0]:idx[1]], dec[idx[0]:idx[1]], color, raster_flag
dist_prev = dist
raster_flag = False
raster_flag = True
cidx = 0
color = cmap(cidx)
yield None, None, color, raster_flag
[docs]def animate(i):
"""
Generate plot corresponding to single frame
:param i: position, color, and boolean flag to clear existing plot
:type i: tuple
:return: lines for this frame
:rtype: Lines2D
"""
ra, dec, c, flag = i
print(c)
if flag is True:
# clear existing raster scan
for l in plt.gca().get_lines()[1:]:
l.remove()
if ra is None:
return []
dx = np.median(ra[1:] - ra[:-1])
dy = np.median(dec[1:] - dec[:-1])
aspect = get_aspect(plt.gca())
angle = get_angle(dx, dy, aspect)
lines = plt.plot(ra, dec, marker=(3, 0, angle), color=c, linewidth=1, markersize=6)
return lines
[docs]def generate_animation(ra, dec, gaplist, figfile='movie.gif'):
"""
Generate animation GIF file to illustrate observing pattern.
:param ra: list of RA
:type ra: ndarray
:param dec: list of Dec
:type dec: ndarray
:param gaplist: list of gaps between raster rows
:type gaplist: list
:param figfile: output file name, defaults to 'movie.gif'
:type figfile: str, optional
"""
row_distance = get_raster_distance(ra, dec, gaplist)
cmap = plt.get_cmap('tab10')
fig = plt.figure()
plt.clf()
anim = FuncAnimation(
fig, animate,
anim_gen(ra, dec, gap_gen(gaplist), row_distance, cmap),
init_func=lambda: plt.plot(ra, dec, '.', color='gray', markersize=2),
blit=True)
anim.save(figfile, writer=ImageMagickWriter(fps=4))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Generate gif animation of raster pattern'
)
parser.add_argument('context_dir', type=str, help='context directory')
parser.add_argument('-a', '--antenna', action='store', dest='antenna', type=int, default=0)
parser.add_argument('-f', '--field', action='store', dest='field', type=int, default=-1)
args = parser.parse_args()
print('DEBUG: antenna={}'.format(args.antenna))
print('DEBUG: field={}'.format(args.field))
print('DEBUG: context_dir="{}"'.format(args.context_dir))
metadata = from_context(args.context_dir)
# Field ID to process
science_targets = get_science_target_fields(metadata)
print(f'DEBUG: science target list: {science_targets}')
if args.field == -1:
field = science_targets[0]
else:
field = args.field
if field not in science_targets:
print(f'ERROR: science target field {field} does not exist')
sys.exit(1)
# ON-SOURCE with Antenna selection
metaon = filter_data(metadata, field, args.antenna, True)
utime = metaon.timestamp
ura = metaon.ra
udec = metaon.dec
if len(utime) == 0:
print('ERROR: antenna {} for field {} does not exist'.format(args.antenna, field))
sys.exit(1)
gsmall, glarge = find_time_gap(utime)
figfile = 'pointing.field{}ant{}.gif'.format(field, args.antenna)
generate_animation(ura, udec, gsmall, figfile=figfile)