Source code for pipeline.hifv.heuristics.bandpass

import os

import numpy as np

import pipeline.infrastructure.utils as utils
import pipeline.infrastructure as infrastructure
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from . import uvrange

LOG = infrastructure.get_logger(__name__)


[docs]def removeRows(caltable, spwids): tb = casa_tools.table() tb.open(caltable, nomodify=False) for spwid in spwids: subtb = tb.query('SPECTRAL_WINDOW_ID == '+str(spwid)) flaggedrows = subtb.rownumbers() if len(flaggedrows) > 0: tb.removerows(flaggedrows) subtb.close() tb.flush() tb.close() tb.open(caltable + '/SPECTRAL_WINDOW', nomodify=False) for spwid in spwids: temparray = tb.getcol('FLAG_ROW') temparray[spwid] = True tb.putcol('FLAG_ROW', temparray) tb.close()
[docs]def computeChanFlag(vis, caltable, context): m = context.observing_run.get_ms(vis) channels = m.get_vla_numchan() with casa_tools.TableReader(caltable) as table: spwVarCol = table.getvarcol('SPECTRAL_WINDOW_ID') dataVarCol = table.getvarcol('CPARAM') flagVarCol = table.getvarcol('FLAG') rowlist = sorted(dataVarCol.keys()) spwids = [] largechunk = False for rrow in rowlist: dataArr = dataVarCol[rrow] flagArr = flagVarCol[rrow] spwArr = spwVarCol[rrow] ispw = spwArr[0] fivepctch = int(0.05*channels[ispw]) startch1 = 0 startch2 = fivepctch - 1 endch1 = channels[ispw] - fivepctch endch2 = channels[ispw] - 1 if (fivepctch < 3): startch2=2 endch1=channels[ispw]-3 # Find flagged ranges in both polarizations rangeA = utils.flagged_intervals(flagArr[0,:].flatten()) rangeB = utils.flagged_intervals(flagArr[1,:].flatten()) # print rangeA, rangeB # If no solutions are found, only one tuple is returned and make note ''' try: if ((rangeA[0][0] == 0 and rangeA[0][1] == len(flagArr[0])-1) or (rangeB[0][0] == 0 and rangeB[0][1] == len(flagArr[1])-1)): LOG.warn("channel pre-averaging bandpass calibration heuristic could not recover solutions for spw="+str(spwArr[0])) print rangeA, rangeB spwids.append(spwArr[0]) largechunk = True except: LOG.warn("Problem with using channel pre-averaging bandpass calibration heuristic - check CASA log") ''' # Determine contiguous lengths of failed solutions for both polarizations, but ignoring edge flagging for row in rangeA[1:-1]: length = row[-1]-row[0] spwids.append(spwArr[0]) LOG.info('WEAKBP FAILED SOLUTION: SPW '+str(spwArr[0])+': '+str(row[0])+'~'+str(row[-1])) if length > len(flagArr[0])/32.0: largechunk = True for row in rangeB[1:-1]: length = row[-1]-row[0] spwids.append(spwArr[0]) LOG.info('WEAKBP FAILED SOLUTION: SPW '+str(spwArr[0])+': '+str(row[0])+'~'+str(row[-1])) if length > len(flagArr[1])/32.0: largechunk = True # print rrow.rjust(4), 'Pol A:', str(np.sum(flagArr[0])).rjust(4),' Pol B:', # str(np.sum(flagArr[1])).rjust(4), # ' / ',str(len(flagArr[0])), # ' chan flagged ', '(', 100.0*float(np.sum(flagArr[0]))/len(flagArr[0]), # '%, ', 100.0*float(np.sum(flagArr[1]))/len(flagArr[1]), '%)' spwids = np.unique(spwids) spwids = list(spwids) return (largechunk, spwids)
[docs]def do_bandpass(vis, caltable, context=None, RefAntOutput=None, spw=None, ktypecaltable=None, bpdgain_touse=None, solint=None, append=None, executor=None): """Run CASA task bandpass""" m = context.observing_run.get_ms(vis) bandpass_field_select_string = context.evla['msinfo'][m.name].bandpass_field_select_string bandpass_scan_select_string = context.evla['msinfo'][m.name].bandpass_scan_select_string minBL_for_cal = m.vla_minbaselineforcal() try: setjy_results = context.results[0].read()[0].setjy_results except Exception as e: setjy_results = context.results[0].read().setjy_results BPGainTables = sorted(context.callibrary.active.get_caltable()) BPGainTables.append(ktypecaltable) BPGainTables.append(bpdgain_touse) bandpass_task_args = {'vis': vis, 'caltable': caltable, 'field': '', 'spw': spw, 'intent': '', 'selectdata': True, 'uvrange': '', 'scan': bandpass_scan_select_string, 'solint': solint, 'combine': 'scan', 'refant': ','.join(RefAntOutput), 'minblperant': minBL_for_cal, 'minsnr': 5.0, 'solnorm': False, 'bandtype': 'B', 'fillgaps': 0, 'smodel': [], 'append': append, 'docallib': False, 'gaintable': BPGainTables, 'gainfield': [''], 'interp': [''], 'spwmap': [], 'parang': True} bpscanslist = list(map(int, bandpass_scan_select_string.split(','))) scanobjlist = m.get_scans(scan_id=bpscanslist) allfieldidlist = [] for scanobj in scanobjlist: fieldobj, = scanobj.fields if str(fieldobj.id) not in allfieldidlist: allfieldidlist.append(str(fieldobj.id)) # See vlascanheuristics - only use the first bandpass calibrator fieldidlist = [fieldid for fieldid in allfieldidlist if fieldid in bandpass_field_select_string] for fieldidstring in fieldidlist: fieldid = int(fieldidstring) uvrangestring = uvrange(setjy_results, fieldid) bandpass_task_args['field'] = fieldidstring bandpass_task_args['uvrange'] = uvrangestring if os.path.exists(caltable): bandpass_task_args['append'] = True job = casa_tasks.bandpass(**bandpass_task_args) executor.execute(job) return True
[docs]def do_bandpassweakbp(vis, caltable, context=None, RefAntOutput=None, spw=None, ktypecaltable=None, bpdgain_touse=None, solint=None, append=None): """Run CASA task bandpass""" m = context.observing_run.get_ms(vis) bandpass_field_select_string = context.evla['msinfo'][m.name].bandpass_field_select_string bandpass_scan_select_string = context.evla['msinfo'][m.name].bandpass_scan_select_string minBL_for_cal = m.vla_minbaselineforcal() BPGainTables = sorted(context.callibrary.active.get_caltable()) BPGainTables.append(ktypecaltable) BPGainTables.append(bpdgain_touse) bandpass_task_args = {'vis': vis, 'caltable': caltable, 'field': bandpass_field_select_string, 'spw': spw, 'intent': '', 'selectdata': True, 'uvrange': '', 'scan': bandpass_scan_select_string, 'solint': solint, 'combine': 'scan', 'refant': ','.join(RefAntOutput), 'minblperant': minBL_for_cal, 'minsnr': 5.0, 'solnorm': False, 'bandtype': 'B', 'fillgaps': 0, 'smodel': [], 'append': append, 'docallib': False, 'gaintable': BPGainTables, 'gainfield': [''], 'interp': [''], 'spwmap': [], 'parang': True} job = casa_tasks.bandpass(**bandpass_task_args) return job
[docs]def weakbp(vis, caltable, context=None, RefAntOutput=None, ktypecaltable=None, bpdgain_touse=None, solint=None, append=None, executor=None, spw=''): m = context.observing_run.get_ms(vis) channels = m.get_vla_numchan() # Number of channels before averaging bpjob = do_bandpassweakbp(vis, caltable, context=context, spw=spw, RefAntOutput=RefAntOutput, ktypecaltable=ktypecaltable, bpdgain_touse=bpdgain_touse, solint='inf', append=False) executor.execute(bpjob) (largechunk, spwids) = computeChanFlag(vis, caltable, context) # print largechunk, spwids if not largechunk and spwids == []: # All solutions found - proceed as normal with the pipeline interp = '' return interp LOG.warning("Solutions for all channels not obtained. Using weak bandpass calibration heuristic.") cpa = 2 # Channel pre-averaging while largechunk: LOG.info("Removing rows in table " + caltable + " for spws="+','.join([str(i) for i in spwids])) removeRows(caltable, spwids) solint = 'inf,' + str(cpa) + 'ch' LOG.warning("Largest contiguous set of channels with no BP solution is greater than maximum " + "allowable 1/32 fractional bandwidth for spw="+','.join([str(i) for i in spwids]) + "." + " Using solint=" + solint) LOG.info('Weak bandpass calibration heuristic. Using solint='+solint) bpjob = do_bandpassweakbp(vis, caltable, context=context, RefAntOutput=RefAntOutput, spw=','.join([str(i) for i in spwids]), ktypecaltable=ktypecaltable, bpdgain_touse=bpdgain_touse, solint=solint, append=True) executor.execute(bpjob) (largechunk, spwids) = computeChanFlag(vis, caltable, context) for spw in spwids: preavgnchan = channels[spw]/float(cpa) LOG.debug("CPA: " + str(cpa) + " NCHAN: "+str(preavgnchan)+" NCHAN/32: "+str(preavgnchan/32.0)) if cpa > preavgnchan/32.0: LOG.warn("Limiting pre-averaging to maximum 1/32 fractional bandwidth for spw="+str(spw) + ". Interpolation in applycal will need to extend over greater " + "than 1/32 fractional bandwidth, which may fail to capture significant bandpass structure.") largechunk = False # This will break the while loop and move onto applycal cpa = cpa * 2 LOG.warning("Channel gaps in bandpass solutions will be linearly interpolated over in applycal.") LOG.warning("Accuracy of bandpass solutions will be slightly degraded at interpolated channels, " + "particularly if these fall at spectral window edges where applycal will " + "perform 'nearest' extrapolation.") interp = 'nearest' return interp