import os
import math
import shutil
import numpy
import casatasks.private.sdbeamutil as sdbeamutil
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.imagelibrary as imagelibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.domain import DataTable
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from . import resultobjects
from .. import common
from ..common import utils
from ..common import utils as sdutils
from ..common import direction_utils as dirutil
LOG = infrastructure.get_logger(__name__)
[docs]def ALMAImageCoordinateUtil(context, ms_names, ant_list, spw_list, fieldid_list):
"""
An utility function to calculate spatial coordinate of image for ALMA
Items in ant_list can be None, which indicates that the function will take into
account pointing data from all the antennas in MS.
"""
# A flag to use field direction as image center (True) rather than center of the map extent
USE_FIELD_DIR = False
idx = utils.get_parent_ms_idx(context, ms_names[0])
if idx >= 0 and idx < len(context.observing_run.measurement_sets):
ref_msobj = context.observing_run.measurement_sets[idx]
else:
raise ValueError("The reference ms, %s, not registered to context" % ms_names[0])
ref_fieldid = fieldid_list[0]
ref_spw = spw_list[0]
source_name = ref_msobj.fields[ref_fieldid].name
# trim source name to workaround '"name"' type of sources
# trimmed_name = source_name.strip('"')
is_eph_obj = ref_msobj.get_fields(ref_fieldid)[0].source.is_eph_obj
is_known_eph_obj = ref_msobj.get_fields(ref_fieldid)[0].source.is_known_eph_obj
# qa tool
qa = casa_tools.quanta
### ALMA specific part ###
# the number of pixels per beam
grid_factor = 9.0
# recommendation by EOC
fwhmfactor = 1.13
# hard-coded for ALMA-TP array
diameter_m = 12.0
obscure_alma = 0.75
### END OF ALMA part ###
# msmd-less implementation
spw = ref_msobj.get_spectral_window(ref_spw)
freq_hz = numpy.float64(spw.mean_frequency.value)
# fields = ref_msobj.get_fields(name=trimmed_name)
fields = ref_msobj.get_fields(name=source_name)
fnames = [f.name for f in fields]
if USE_FIELD_DIR:
me_center = fields[0].mdirection
# cellx and celly
theory_beam_arcsec = sdbeamutil.primaryBeamArcsec(freq_hz, diameter_m, obscure_alma, 10.0, fwhmfactor=fwhmfactor)
grid_size = qa.quantity(theory_beam_arcsec, 'arcsec')
cellx = qa.div(grid_size, grid_factor)
celly = cellx
cell_in_deg = qa.convert(cellx, 'deg')['value']
LOG.info('Calculating image coordinate of field \'%s\', reference frequency %fGHz' % (fnames[0], freq_hz * 1.e-9))
LOG.info('cell=%s' % (qa.tos(cellx)))
# nx, ny and center
parent_mses = [utils.get_parent_ms_name(context, name) for name in ms_names]
ra0 = []
dec0 = []
outref = None
org_direction = None
for vis, ant_id, field_id, spw_id in zip(parent_mses, ant_list, fieldid_list, spw_list):
# get first org_direction if source if ephemeris source
# if is_eph_obj and org_direction==None:
if ( is_eph_obj or is_known_eph_obj ) and org_direction==None:
# get org_direction
msobj = context.observing_run.get_ms(vis)
org_direction = msobj.get_fields(field_id)[0].source.org_direction
datatable_name = os.path.join(context.observing_run.ms_datatable_name, os.path.basename(vis))
datatable = DataTable(name=datatable_name, readonly=True)
if (datatable.getcolkeyword('RA', 'UNIT') != 'deg') or \
(datatable.getcolkeyword('DEC', 'UNIT') != 'deg') or \
(datatable.getcolkeyword('OFS_RA', 'UNIT') != 'deg') or \
(datatable.getcolkeyword('OFS_DEC', 'UNIT') != 'deg'):
raise RuntimeError("Found unexpected unit of RA/DEC in DataTable. It should be in 'deg'")
if ant_id is None:
# take all the antennas into account
msobj = context.observing_run.get_ms(vis)
num_antennas = len(msobj.antennas)
_vislist = [vis for _ in range(num_antennas)]
_antlist = [i for i in range(num_antennas)]
_fieldlist = [field_id for _ in range(num_antennas)]
_spwlist = [spw_id for _ in range(num_antennas)]
index_list = sorted(common.get_index_list_for_ms(datatable, _vislist, _antlist, _fieldlist, _spwlist))
else:
index_list = sorted(common.get_index_list_for_ms(datatable, [vis], [ant_id], [field_id], [spw_id]))
# if is_eph_obj:
if is_eph_obj or is_known_eph_obj:
_ra = datatable.getcol('OFS_RA').take(index_list)
_dec = datatable.getcol('OFS_DEC').take(index_list)
else:
_ra = datatable.getcol('RA').take(index_list)
_dec = datatable.getcol('DEC').take(index_list)
ra0.extend(_ra)
dec0.extend(_dec)
outref = datatable.direction_ref
del datatable
if len(ra0) == 0:
antenna_name = ref_msobj.antennas[ant_list[0]].name
LOG.warn('No valid data for source %s antenna %s spw %s in %s. Image will not be created.' % (
source_name, antenna_name, ref_spw, ref_msobj.basename))
return False
if outref is None:
LOG.warn('No direction reference is set. Assuming ICRS')
outref = 'ICRS'
# if is_eph_obj:
# ra_offset = qa.convert( org_direction['m0'], 'deg' )['value']
# dec_offset = qa.convert( org_direction['m1'], 'deg' )['value']
# else:
# ra_offset = 0.0
# dec_offset = 0.0
# convert offset coordinate into shifted coordinate for ephemeris sources
# to determine the image size (size, center, npix)
if is_eph_obj or is_known_eph_obj:
ra = []
dec = []
for ra1, dec1 in zip( ra0, dec0 ):
ra2, dec2 = dirutil.direction_recover( ra1, dec1, org_direction )
ra.append(ra2)
dec.append(dec2)
else:
ra = ra0
dec = dec0
del ra0, dec0
ra_min = min(ra)
ra_max = max(ra)
dec_min = min(dec)
dec_max = max(dec)
if USE_FIELD_DIR:
# phasecenter = field direction
ra_center = qa.convert(me_center['m0'], 'deg')
dec_center = qa.convert(me_center['m1'], 'deg')
else:
# map center
ra_center = qa.quantity(0.5 * (ra_min + ra_max), 'deg')
dec_center = qa.quantity(0.5 * (dec_min + dec_max), 'deg')
ra_center_in_deg = qa.getvalue(ra_center)
dec_center_in_deg = qa.getvalue(dec_center)
phasecenter = '{0} {1} {2}'.format(outref,
qa.formxxx(ra_center, 'hms'),
qa.formxxx(dec_center, 'dms'))
LOG.info('phasecenter=\'%s\'' % (phasecenter,))
dec_correction = 1.0 / math.cos(dec_center_in_deg / 180.0 * 3.1415926535897931)
width = 2 * max(abs(ra_center_in_deg - ra_min), abs(ra_max - ra_center_in_deg))
height = 2 * max(abs(dec_center_in_deg - dec_min), abs(dec_max - dec_center_in_deg))
LOG.debug('Map extent: [%f, %f] arcmin' % (width / 60., height / 60.))
nx = int(width / (cell_in_deg * dec_correction)) + 1
ny = int(height / cell_in_deg) + 1
# Adjust nx and ny to be even number for performance (which is
# recommended by imager).
# Also increase nx and ny by 2 if they are even number.
# This is due to a behavior of the imager. The imager configures
# direction axis as follows:
# reference value: phasecenter
# increment: cellx, celly
# reference pixel: ceil((nx-1)/2), ceil((ny-1)/2)
# It means that reference pixel will not be map center if nx/ny
# is even number. It results in asymmetric area coverage on both
# sides of the reference pixel, which may miss certain range of
# (order of 1 pixel) observed area near the edge.
if nx % 2 == 0:
nx += 2
else:
nx += 1
if ny % 2 == 0:
ny += 2
else:
ny += 1
LOG.info('Image pixel size: [nx, ny] = [%s, %s]' % (nx, ny))
return phasecenter, cellx, celly, nx, ny, org_direction
[docs]class SDImagingWorker(basetask.StandardTaskTemplate):
Inputs = SDImagingWorkerInputs
is_multi_vis_task = True
[docs] def prepare(self):
inputs = self.inputs
context = self.inputs.context
infiles = inputs.infiles
outfile = inputs.outfile
edge = inputs.edge
antid_list = inputs.antids
spwid_list = inputs.spwids
fieldid_list = inputs.fieldids
imagemode = inputs.mode
file_index = [common.get_parent_ms_idx(context, name) for name in infiles]
mses = context.observing_run.measurement_sets
v_spwids = [context.observing_run.real2virtual_spw_id(i, mses[j]) for i, j in zip(spwid_list, file_index)]
rep_ms = mses[file_index[0]]
ant_name = rep_ms.antennas[antid_list[0]].name
source_name = rep_ms.fields[fieldid_list[0]].clean_name
phasecenter, cellx, celly, nx, ny, org_direction = self._get_map_coord(inputs, context, infiles, antid_list, spwid_list,
fieldid_list)
status = self._do_imaging(infiles, antid_list, spwid_list, fieldid_list, outfile, imagemode, edge, phasecenter,
cellx, celly, nx, ny)
if status is True:
# missing attributes in result instance will be filled in by the
# parent class
image_item = imagelibrary.ImageItem(imagename=outfile,
sourcename=source_name,
spwlist=v_spwids, # virtual
specmode='cube',
sourcetype='TARGET',
org_direction=org_direction)
image_item.antenna = ant_name # name #(group name)
outcome = {}
outcome['image'] = image_item
result = resultobjects.SDImagingResultItem(task=None,
success=True,
outcome=outcome)
else:
# Imaging failed due to missing valid data
result = resultobjects.SDImagingResultItem(task=None,
success=False,
outcome=None)
return result
[docs] def analyse(self, result):
return result
def _get_map_coord(self, inputs, context, infiles, ant_list, spw_list, field_list):
params = (inputs.phasecenter, inputs.cellx, inputs.celly, inputs.nx, inputs.ny, inputs.org_direction)
coord_set = (params.count(None) == 0) or ( (params.count(None) == 1) and inputs.org_direction is None )
if coord_set:
return params
else:
params = ALMAImageCoordinateUtil(context, infiles, ant_list, spw_list, field_list)
if not params:
raise RuntimeError( "No valid data" )
return params
def _do_imaging(self, infiles, antid_list, spwid_list, fieldid_list, imagename, imagemode, edge, phasecenter, cellx,
celly, nx, ny):
context = self.inputs.context
is_nro = sdutils.is_nro(context)
idx = utils.get_parent_ms_idx(context, infiles[0])
if idx >= 0 and idx < len(context.observing_run.measurement_sets):
reference_data = context.observing_run.measurement_sets[idx]
else:
raise ValueError("The reference ms, %s, not registered to context" % infiles[0])
ref_spwid = spwid_list[0]
LOG.debug('Members to be processed:')
for (m, a, s, f) in zip(infiles, antid_list, spwid_list, fieldid_list):
LOG.debug('\tMS %s: Antenna %s Spw %s Field %s'%(os.path.basename(m), a, s, f))
# Check for ephemeris source
# known_ephemeris_list = ['MERCURY', 'VENUS', 'MARS', 'JUPITER', 'SATURN', 'URANUS', 'NEPTUNE', 'PLUTO', 'SUN',
# 'MOON']
ephemsrcname = ''
reference_field = reference_data.fields[fieldid_list[0]]
source_name = reference_field.name
is_eph_obj = reference_data.get_fields(fieldid_list[0])[0].source.is_eph_obj
is_known_eph_obj = reference_data.get_fields(fieldid_list[0])[0].source.is_known_eph_obj
# for ephemeris sources without ephemeris data in MS (eg. not for ALMA)
# if not is_eph_obj:
if is_known_eph_obj:
# me = casa_tools.measures
# ephemeris_list = me.listcodes(me.direction())['extra']
# known_ephemeris_list = numpy.delete( ephemeris_list, numpy.where(ephemeris_list=='COMET') )
# if source_name.upper() in known_ephemeris_list:
ephemsrcname = source_name.upper()
# baseline
# baseline = '0&&&'
# mode
mode = 'channel'
# stokes
stokes = self.inputs.stokes
# start, nchan, step
ref_spwobj = reference_data.spectral_windows[ref_spwid]
total_nchan = ref_spwobj.num_channels
if total_nchan == 1:
start = 0
step = 1
nchan = 1
else:
start = edge[0]
step = 1
nchan = total_nchan - sum(edge)
# ampcal
if imagemode == 'AMPCAL':
step = nchan
nchan = 1
# restfreq
restfreq = self.inputs.restfreq
if not isinstance(restfreq, str):
raise RuntimeError("Invalid type for restfreq '{0}' (not a string)".format(restfreq))
if restfreq.strip() == '':
# if restfreq is NOT given by user
# first try using SOURCE.REST_FREQUENCY
# if it is not available, use SPECTRAL_WINDOW.REF_FREQUENCY instead
source_id = reference_field.source_id
rest_freq_value = utils.get_restfrequency(vis=infiles[0], spwid=ref_spwobj.id, source_id=source_id)
rest_freq_unit = 'Hz'
if rest_freq_value is None:
# REST_FREQUENCY is not defined in the SOURCE tableq
rest_freq = ref_spwobj.ref_frequency
rest_freq_value = numpy.double(rest_freq.value)
rest_freq_unit = rest_freq.units['symbol']
if rest_freq_value is not None:
qa = casa_tools.quanta
restfreq = qa.tos(qa.quantity(rest_freq_value, rest_freq_unit))
else:
raise RuntimeError("Could not get reference frequency of Spw %d" % ref_spwid)
else:
# restfreq is given by user
# check if user provided restfreq is valid
qa = casa_tools.quanta
x = qa.quantity(restfreq)
if x['value'] <= 0:
raise RuntimeError("Invalid restfreq '{0}' (must be positive)".format(restfreq))
x = qa.convert(x, 'Hz')
if qa.getunit(x) != 'Hz':
raise RuntimeError("Invalid restfreq '{0}' (inappropriate unit)".format(restfreq))
# outframe
outframe = 'LSRK'
# gridfunction
gridfunction = 'SF'
# truncate, gwidth, jwidth, and convsupport
truncate = gwidth = jwidth = -1 # defaults (not used)
# PIPE-689: convsupport should be 3 for NRO Pipeline
if is_nro:
convsupport = 3
else:
convsupport = 6
# temporary_name = imagename.rstrip('/')+'.tmp'
cleanup_params = ['outfile', 'infiles', 'spw', 'scan']
# phasecenter=TRACKFIELD only for sources with ephemeris table
if is_eph_obj:
phasecenter = 'TRACKFIELD'
LOG.info( "phasecenter is overrided with \'TRACKFIELD\'" )
qa = casa_tools.quanta
image_args = {'mode': mode,
'intent': "OBSERVE_TARGET#ON_SOURCE",
'nchan': nchan,
'start': start,
'width': step,
# the task only accepts lower letter
'outframe': outframe.lower(),
'gridfunction': gridfunction,
'convsupport': convsupport,
'truncate': truncate,
'gwidth': gwidth,
'jwidth': jwidth,
'imsize': [nx, ny],
'cell': [qa.tos(cellx), qa.tos(celly)],
'phasecenter': phasecenter,
'restfreq': restfreq,
'stokes': stokes,
'ephemsrcname': ephemsrcname}
# remove existing image explicitly
for rmname in [imagename, imagename.rstrip('/') + '.weight']:
if os.path.exists(rmname):
shutil.rmtree(rmname)
# imaging
infile_list = []
spwsel_list = []
fieldsel_list = []
antsel_list = []
for (msname, ant, spw, field) in zip(infiles, antid_list, spwid_list, fieldid_list):
LOG.debug('Registering data to image: vis=\'%s\', ant=%s, spw=%s, field=%s%s'%(msname, ant, spw, field,
(' (ephemeris source)' if ephemsrcname!='' else '')))
infile_list.append(msname)
spwsel_list.append(str(spw))
fieldsel_list.append(str(field))
antsel_list.append(str(ant))
# collapse selection if possible
spwsel_list = spwsel_list[0] if len(set(spwsel_list)) == 1 else spwsel_list
fieldsel_list = fieldsel_list[0] if len(set(fieldsel_list)) == 1 else fieldsel_list
antsel_list = antsel_list[0] if len(set(antsel_list)) == 1 else antsel_list
# set-up image dependent parameters
for p in cleanup_params: image_args[p] = None
image_args['outfile'] = imagename
image_args['infiles'] = infile_list
image_args['spw'] = spwsel_list
image_args['field'] = fieldsel_list
image_args['antenna'] = antsel_list
LOG.debug('Executing sdimaging task: args=%s' % (image_args))
# execute job
# tentative soltion for tsdimaging speed issue
if phasecenter == 'TRACKFIELD':
image_job = casa_tasks.tsdimaging(**image_args)
self._executor.execute(image_job)
# tsdimaging changes the image filename, workaround to revert it
imagename_tmp = imagename + '.image'
os.rename( imagename_tmp, imagename )
else:
image_job = casa_tasks.sdimaging(**image_args)
self._executor.execute(image_job)
# check imaging result
imagename = image_args['outfile']
weightname = imagename + '.weight'
if not os.path.exists(imagename) or not os.path.exists(weightname):
LOG.error("Generation of %s failed" % imagename)
return False
# check for valid pixels (non-zero weight)
# Task sdimaging does not fail even if no data is gridded to image.
# In that case, image is not masked, no restoring beam is set to
# image, and all pixels in corresponding weight image is zero.
with casa_tools.ImageReader(weightname) as ia:
sumsq = ia.statistics()['sumsq'][0]
if sumsq == 0.0:
LOG.warning("No valid pixel found in image, %s. Discarding the image from futher processing." % imagename)
return False
return True