Source code for pipeline.hifv.tasks.fluxscale.solint

import collections
import os

import numpy as np

import pipeline.hif.heuristics.findrefant as findrefant
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.hifv.heuristics import getCalFlaggedSoln, uvrange
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import casa_tools
from pipeline.infrastructure import task_registry

LOG = infrastructure.get_logger(__name__)


[docs]class SolintInputs(vdp.StandardInputs): limit_short_solint = vdp.VisDependentProperty(default='') refantignore = vdp.VisDependentProperty(default='') def __init__(self, context, vis=None, limit_short_solint=None, refantignore=None): super(SolintInputs, self).__init__() self.context = context self.vis = vis self.limit_short_solint = limit_short_solint self.refantignore = refantignore self.gain_solint1 = 'int' self.gain_solint2 = 'int'
[docs]class SolintResults(basetask.Results): def __init__(self, final=None, pool=None, preceding=None, longsolint=None, gain_solint2=None, shortsol2=None, short_solint=None, new_gain_solint1=None, vis=None, bpdgain_touse=None): if final is None: final = [] if pool is None: pool = [] if preceding is None: preceding = [] super(SolintResults, self).__init__() self.vis = vis self.pool = pool[:] self.final = final[:] self.preceding = preceding[:] self.error = set() self.longsolint = longsolint self.gain_solint2 = gain_solint2 self.shortsol2 = shortsol2 self.short_solint = short_solint self.new_gain_solint1 = new_gain_solint1 self.bpdgain_touse = bpdgain_touse
[docs] def merge_with_context(self, context): m = context.observing_run.get_ms(self.vis) context.evla['msinfo'][m.name].gain_solint2 = self.gain_solint2 context.evla['msinfo'][m.name].longsolint = self.longsolint context.evla['msinfo'][m.name].shortsol2 = self.shortsol2 context.evla['msinfo'][m.name].short_solint = self.short_solint context.evla['msinfo'][m.name].new_gain_solint1 = self.new_gain_solint1
[docs]@task_registry.set_equivalent_casa_task('hifv_solint') class Solint(basetask.StandardTaskTemplate): Inputs = SolintInputs
[docs] def prepare(self): m = self.inputs.context.observing_run.get_ms(self.inputs.vis) spw2band = m.get_vla_spw2band() band2spw = collections.defaultdict(list) spwobjlist = m.get_spectral_windows(science_windows_only=True) listspws = [spw.id for spw in spwobjlist] for spw, band in spw2band.items(): if spw in listspws: # Science intents only band2spw[band].append(str(spw)) longsolint = {} gain_solint2 = {} shortsol2 = {} short_solint = {} new_gain_solint1 = {} bpdgain_touse = {} calMs = 'calibrators.ms' split_result = self._do_split(calMs) for band, spwlist in band2spw.items(): longsolint_band, gain_solint2_band, shortsol2_band, short_solint_band, \ new_gain_solint1_band, vis, bpdgain_touse_band = self._do_solint(band, spwlist, calMs) longsolint[band] = longsolint_band gain_solint2[band] = gain_solint2_band shortsol2[band] = shortsol2_band short_solint[band] = short_solint_band new_gain_solint1[band] = new_gain_solint1_band bpdgain_touse[band] = bpdgain_touse_band return SolintResults(longsolint=longsolint, gain_solint2=gain_solint2, shortsol2=shortsol2, short_solint=short_solint, new_gain_solint1=new_gain_solint1, vis=vis, bpdgain_touse=bpdgain_touse)
[docs] def analyse(self, results): return results
def _do_solint(self, band, spwlist, calMs): # Solint section (longsolint, gain_solint2) = self._do_determine_solint(calMs, ','.join(spwlist)) try: self.setjy_results = self.inputs.context.results[0].read()[0].setjy_results except Exception as e: self.setjy_results = self.inputs.context.results[0].read().setjy_results try: stage_number = self.inputs.context.results[-1].read()[0].stage_number + 1 except Exception as e: stage_number = self.inputs.context.results[-1].read().stage_number + 1 tableprefix = os.path.basename(self.inputs.vis) + '.' + 'hifv_solint.s' # Testgains section context = self.inputs.context tablebase = tableprefix + str(stage_number) + '_1.' + 'testgaincal' table_suffix = ['_{!s}.tbl'.format(band), '3_{!s}.tbl'.format(band), '10_{!s}.tbl'.format(band), 'scan_{!s}.tbl'.format(band), 'limit_{!s}.tbl'.format(band)] soltimes = [1.0, 3.0, 10.0] m = self.inputs.context.observing_run.get_ms(self.inputs.vis) soltimes = [m.get_vla_max_integration_time() * x for x in soltimes] solints = ['int', str(soltimes[1]) + 's', str(soltimes[2]) + 's'] soltime = soltimes[0] solint = solints[0] shortsol1 = context.evla['msinfo'][m.name].shortsol1[band] combtime = 'scan' refantfield = context.evla['msinfo'][m.name].calibrator_field_select_string self.ignorerefant = self.inputs.context.evla['msinfo'][m.name].ignorerefant refantignore = self.inputs.refantignore + ','.join(self.ignorerefant) refantobj = findrefant.RefAntHeuristics(vis=calMs, field=refantfield, geometry=True, flagging=True, intent='', spw='', refantignore=refantignore) RefAntOutput = refantobj.calculate() refAnt = ','.join(RefAntOutput) bpdgain_touse = tablebase + table_suffix[0] testgains_result = self._do_gtype_testgains(calMs, bpdgain_touse, solint=solint, context=context, combtime=combtime, refAnt=refAnt, spw=','.join(spwlist)) flaggedSolnResult1 = getCalFlaggedSoln(bpdgain_touse) LOG.info("For solint = " + solint + " fraction of flagged solutions = " + str(flaggedSolnResult1['all']['fraction'])) LOG.info("Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult1['antmedian']['fraction'])) if flaggedSolnResult1['all']['total'] > 0: fracFlaggedSolns1 = flaggedSolnResult1['antmedian']['fraction'] else: fracFlaggedSolns1 = 1.0 shortsol2 = soltime if fracFlaggedSolns1 > 0.05: soltime = soltimes[1] solint = solints[1] testgains_result = self._do_gtype_testgains(calMs, tablebase + table_suffix[1], solint=solint, context=context, combtime=combtime, refAnt=refAnt, spw=','.join(spwlist)) flaggedSolnResult3 = getCalFlaggedSoln(tablebase + table_suffix[0]) LOG.info("For solint = " + solint + " fraction of flagged solutions = " + str(flaggedSolnResult3['all']['fraction'])) LOG.info("Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult3['antmedian']['fraction'])) if flaggedSolnResult3['all']['total'] > 0: fracFlaggedSolns3 = flaggedSolnResult3['antmedian']['fraction'] else: fracFlaggedSolns3 = 1.0 if fracFlaggedSolns3 < fracFlaggedSolns1: shortsol2 = soltime bpdgain_touse = tablebase + table_suffix[1] if fracFlaggedSolns3 > 0.05: soltime = soltimes[2] solint = solints[2] testgains_result = self._do_gtype_testgains(calMs, tablebase + table_suffix[2], solint=solint, context=context, combtime=combtime, refAnt=refAnt, spw=','.join(spwlist)) flaggedSolnResult10 = getCalFlaggedSoln(tablebase + table_suffix[2]) LOG.info("For solint = " + solint + " fraction of flagged solutions = " + str(flaggedSolnResult3['all']['fraction'])) LOG.info("Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult3['antmedian']['fraction'])) if flaggedSolnResult10['all']['total'] > 0: fracFlaggedSolns10 = flaggedSolnResult10['antmedian']['fraction'] else: fracFlaggedSolns10 = 1.0 if fracFlaggedSolns10 < fracFlaggedSolns3: shortsol2 = soltime bpdgain_touse = tablebase + table_suffix[2] if fracFlaggedSolns10 > 0.05: solint = 'inf' combtime = '' testgains_result = self._do_gtype_testgains(calMs, tablebase + table_suffix[3], solint=solint, context=context, combtime=combtime, refAnt=refAnt, spw=','.join(spwlist)) flaggedSolnResultScan = getCalFlaggedSoln(tablebase + table_suffix[3]) LOG.info("For solint = " + solint + " fraction of flagged solutions = " + str(flaggedSolnResult3['all']['fraction'])) LOG.info("Median fraction of flagged solutions per antenna = " + str(flaggedSolnResult3['antmedian']['fraction'])) if flaggedSolnResultScan['all']['total'] > 0: fracFlaggedSolnsScan = flaggedSolnResultScan['antmedian']['fraction'] else: fracFlaggedSolnsScan = 1.0 if fracFlaggedSolnsScan < fracFlaggedSolns10: shortsol2 = context.evla['msinfo'][m.name].longsolint bpdgain_touse = tablebase + table_suffix[3] if fracFlaggedSolnsScan > 0.05: LOG.warn("Warning, large fraction of flagged solutions. " + "There might be something wrong with your data") LOG.info("ShortSol1: " + str(shortsol1)) LOG.info("ShortSol2: " + str(shortsol2)) short_solint = max(shortsol1, shortsol2) LOG.info("Short_solint determined from heuristics: " + str(short_solint)) new_gain_solint1 = str(short_solint) + 's' if self.inputs.limit_short_solint: LOG.warn("Short Solint limited by user keyword input to " + str(self.inputs.limit_short_solint)) limit_short_solint = self.inputs.limit_short_solint short_solint_str = "{:.12f}".format(short_solint) if limit_short_solint == 'int': limit_short_solint = '0' combtime = 'scan' short_solint = float(limit_short_solint) new_gain_solint1 = str(short_solint) + 's' elif limit_short_solint == 'inf': combtime = '' short_solint = limit_short_solint new_gain_solint1 = short_solint LOG.warn(" Note that since 'inf' was specified then combine='' for gaincal.") # This comparison needed to change for Python 3 elif str(limit_short_solint) <= short_solint_str: short_solint = float(limit_short_solint) new_gain_solint1 = str(short_solint) + 's' combtime = 'scan' # PIPE-460. Use solint='int' when the minimum solution interval corresponds to one integration # PIPE-696. Need to compare short solint with int time and limit the precision. if short_solint == float("{:.6f}".format(m.get_vla_max_integration_time())): new_gain_solint1 = 'int' LOG.info( 'The short solution interval used is: {!s} ({!s}).'.format(new_gain_solint1, str(short_solint) + 's')) testgains_result = self._do_gtype_testgains(calMs, tablebase + table_suffix[4], solint=new_gain_solint1, context=context, combtime=combtime, refAnt=refAnt, spw=','.join(spwlist)) bpdgain_touse = tablebase + table_suffix[4] LOG.info("Using short solint = " + str(new_gain_solint1)) if abs(longsolint - short_solint) <= soltime: LOG.warn('Short solint = long solint +/- integration time of {}s'.format(soltime)) return longsolint, gain_solint2, shortsol2, short_solint, new_gain_solint1, self.inputs.vis, bpdgain_touse def _do_split(self, calMs): m = self.inputs.context.observing_run.get_ms(self.inputs.vis) calibrator_scan_select_string = self.inputs.context.evla['msinfo'][m.name].calibrator_scan_select_string LOG.info("Splitting out calibrators into " + calMs) task_args = {'vis': m.name, 'outputvis': calMs, 'datacolumn': 'corrected', 'keepmms': True, 'field': '', 'spw': '', # 'width' : int(max(channels)), 'width': 1, 'antenna': '', 'timebin': '0s', 'timerange': '', 'scan': calibrator_scan_select_string, 'intent': '', 'array': '', 'uvrange': '', 'correlation': '', 'observation': '', 'keepflags': False} job = casa_tasks.split(**task_args) return self._executor.execute(job) def _do_determine_solint(self, calMs, spw=''): durations = [] old_spws = [] old_field = '' with casa_tools.MSReader(calMs) as ms: scan_summary = ms.getscansummary() m = self.inputs.context.observing_run.get_ms(self.inputs.vis) phase_scan_list = self.inputs.context.evla['msinfo'][m.name].phase_scan_select_string.split(',') phase_scan_list = [int(i) for i in phase_scan_list] # Sub-select phase scan list per band phase_scanids_perband = [scan.id for scan in m.get_scans(scan_id=phase_scan_list, spw=spw)] for kk in range(len(phase_scan_list)): ii = phase_scan_list[kk] if ii in phase_scanids_perband: try: # Collect beginning and ending times # Take max of end times and min of beginning times endtimes = [scan_summary[str(ii)][scankey]['EndTime'] for scankey in scan_summary[str(ii)]] begintimes = [scan_summary[str(ii)][scankey]['BeginTime'] for scankey in scan_summary[str(ii)]] end_time = max(endtimes) begin_time = min(begintimes) new_spws = scan_summary[str(ii)]['0']['SpwIds'] new_field = scan_summary[str(ii)]['0']['FieldId'] if ((kk > 0) and (phase_scan_list[kk-1] == ii-1) and (set(new_spws) == set(old_spws)) and (new_field == old_field)): # if contiguous scans, just increase the time on the previous one LOG.info("End time, old begin time {} {}".format(end_time, old_begin_time)) durations[-1] = 86400*(end_time - old_begin_time) else: LOG.info("End time, begin time {} {}".format(end_time, begin_time)) durations.append(86400*(end_time - begin_time)) old_begin_time = begin_time LOG.info("append durations, old, begin:".format(durations, old_begin_time, begin_time)) LOG.info("Scan "+str(ii)+" has "+str(durations[-1])+"s on source") old_spws = new_spws old_field = new_field except KeyError: LOG.warn("WARNING: scan "+str(ii)+" is completely flagged and missing from " + calMs) longsolint = (np.max(durations)) * 1.01 gain_solint2 = str(longsolint) + 's' return longsolint, gain_solint2 def _do_gtype_testgains(self, calMs, caltable, solint='int', context=None, combtime='scan', refAnt=None, spw=''): m = self.inputs.context.observing_run.get_ms(self.inputs.vis) calibrator_scan_select_string = context.evla['msinfo'][m.name].calibrator_scan_select_string scanlist = [int(scan) for scan in calibrator_scan_select_string.split(',')] scanids_perband = ','.join([str(scan.id) for scan in m.get_scans(scan_id=scanlist, spw=spw)]) minBL_for_cal = m.vla_minbaselineforcal() task_args = {'vis': calMs, 'caltable': caltable, 'field': '', 'spw': spw, 'intent': '', 'selectdata': True, 'scan': scanids_perband, 'solint': solint, 'combine': combtime, 'preavg': -1.0, 'refant': refAnt.lower(), 'minblperant': minBL_for_cal, 'minsnr': 5.0, 'solnorm': False, 'gaintype': 'G', 'smodel': [], 'calmode': 'ap', 'append': False, 'gaintable': [''], 'gainfield': [''], 'interp': [''], 'spwmap': [], 'uvrange': '', 'parang': True} calscanslist = list(map(int, scanids_perband.split(','))) scanobjlist = m.get_scans(scan_id=calscanslist, scan_intent=['AMPLITUDE', 'BANDPASS', 'POLLEAKAGE', 'POLANGLE', 'PHASE', 'POLARIZATION', 'CHECK']) fieldidlist = [] for scanobj in scanobjlist: fieldobj, = scanobj.fields if str(fieldobj.id) not in fieldidlist: fieldidlist.append(str(fieldobj.id)) for fieldidstring in fieldidlist: fieldid = int(fieldidstring) uvrangestring = uvrange(self.setjy_results, fieldid) task_args['field'] = fieldidstring task_args['uvrange'] = uvrangestring if os.path.exists(caltable): task_args['append'] = True job = casa_tasks.gaincal(**task_args) self._executor.execute(job) return True