Source code for pipeline.hifv.tasks.finalcals.applycals

import collections

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks import applycal as happlycal
from pipeline.hif.tasks import applycal
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry

LOG = infrastructure.get_logger(__name__)


[docs]class ApplycalsInputs(applycal.IFApplycalInputs): """ ApplycalInputs defines the inputs for the Applycal pipeline task. """ def __init__(self, context, output_dir=None, vis=None, # data selection arguments field=None, spw=None, antenna=None, intent=None, # preapply calibrations parang=None, applymode=None, calwt=None, flagbackup=None, flagsum=None, flagdetailedsum=None, gainmap=None): super(ApplycalsInputs, self).__init__(context, output_dir=output_dir, vis=vis, field=field, spw=spw, antenna=antenna, intent=intent, parang=parang, applymode=applymode, flagbackup=flagbackup, flagsum=flagsum, flagdetailedsum=flagdetailedsum) self.calwt = calwt self.gainmap = gainmap parang = vdp.VisDependentProperty(default=True) field = vdp.VisDependentProperty(default='') spw = vdp.VisDependentProperty(default='') intent = vdp.VisDependentProperty(default='') flagbackup = vdp.VisDependentProperty(default=False) calwt = vdp.VisDependentProperty(default=False) gainmap = vdp.VisDependentProperty(default=False)
[docs] def to_casa_args(self): d = super(ApplycalsInputs, self).to_casa_args() d['intent'] = '' d['field'] = '' d['spw'] = '' return d
[docs]@task_registry.set_equivalent_casa_task('hifv_applycals') class Applycals(applycal.IFApplycal): Inputs = ApplycalsInputs # Note this is a temporary workaround antenna_to_apply = '*&*'
[docs] def prepare(self): # Run applycal applycal_results = self._do_applycal() return applycal_results
[docs] def analyse(self, results): return results
def _do_applycal(self): result = self.applycal_run() return result
[docs] def applycal_run(self): inputs = self.inputs # Get the target data selection for this task as a CalTo object calto = callibrary.get_calto_from_inputs(inputs) # Now get the calibration state for that CalTo data selection. The # returned dictionary of CalTo:CalFroms specifies the calibrations to # be applied and the data selection to apply them to. # # Note that no 'ignore' argument is given to get_calstate # (specifically, we don't say ignore=['calwt'] like many other tasks) # as applycal is a task that can handle calwt and so different values # of calwt should in this case result in different tasks. calstate = inputs.context.callibrary.get_calstate(calto) merged = calstate.merged() # run a flagdata job to find the flagged state before applycal if inputs.flagsum: # 20170406 TN # flagdata task arguments are indirectly given so that sd applycal task is # able to edit them summary_args = dict(vis=inputs.vis, mode='summary') flagdata_summary_job = casa_tasks.flagdata(**self._get_flagsum_arg(summary_args)) stats_before = self._executor.execute(flagdata_summary_job) stats_before['name'] = 'before' if inputs.gainmap: applycalgroups = self.match_fields_scans() else: applycalgroups = collections.defaultdict(list) applycalgroups['1'] = [''] jobs = [] for gainfield, scanlist in applycalgroups.items(): for calto, calfroms in merged.items(): # if there's nothing to apply for this data selection, continue if not calfroms: continue # arrange a calibration job for the unique data selection inputs.spw = calto.spw inputs.field = calto.field inputs.intent = calto.intent args = inputs.to_casa_args() # Do this a different way ? args.pop('flagsum', None) # Flagsum is not a CASA applycal task argument args.pop('flagdetailedsum', None) # Flagdetailedsum is not a CASA applycal task argument # set the on-the-fly calibration state for the data selection. calapp = callibrary.CalApplication(calto, calfroms) # Note this is a temporary workaround ### args['antenna'] = self.antenna_to_apply # Note this is a temporary workaround ### args['gaintable'] = calapp.gaintable args['gainfield'] = calapp.gainfield args['spwmap'] = calapp.spwmap args['interp'] = calapp.interp args['calwt'] = calapp.calwt args['applymode'] = inputs.applymode if inputs.gainmap: # Determine what tables gainfield should used with if mode='gainmap' for i, table in enumerate(args['gaintable']): if 'finalampgaincal' in table or 'finalphasegaincal' in table: args['interp'][i] = 'linear' args['gainfield'][i] = gainfield # args['interp'] = ['', '', '', '', 'linear,linearflag', '', 'linear', 'linear'] # args['gainfield'] = ['','','','','','', gainfield, gainfield] args['scan'] = ','.join(scanlist) LOG.info("Using gainfield {!s} and scan={!s}".format(gainfield, ','.join(scanlist))) args.pop('gainmap', None) jobs.append(casa_tasks.applycal(**args)) if inputs.gainmap: for calto, calfroms in merged.items(): # if there's nothing to apply for this data selection, continue if not calfroms: continue # arrange a calibration job for the unique data selection inputs.spw = calto.spw inputs.field = calto.field inputs.intent = calto.intent args = inputs.to_casa_args() args['intent'] = 'CALIBRATE*' # Do this a different way ? args.pop('flagsum', None) # Flagsum is not a CASA applycal task argument args.pop('flagdetailedsum', None) # Flagdetailedsum is not a CASA applycal task argument # set the on-the-fly calibration state for the data selection. calapp = callibrary.CalApplication(calto, calfroms) # Note this is a temporary workaround ### args['antenna'] = self.antenna_to_apply # Note this is a temporary workaround ### args['gaintable'] = calapp.gaintable args['gainfield'] = calapp.gainfield args['spwmap'] = calapp.spwmap args['interp'] = calapp.interp args['calwt'] = calapp.calwt args['applymode'] = inputs.applymode args.pop('gainmap', None) jobs.append(casa_tasks.applycal(**args)) # execute the jobs for job in jobs: self._executor.execute(job) # run a final flagdata job to get the flagging statistics after # application of the potentially flagged caltables if inputs.flagsum: stats_after = self._executor.execute(flagdata_summary_job) stats_after['name'] = 'applycal' applied = [callibrary.CalApplication(calto, calfroms) for calto, calfroms in merged.items()] result = happlycal.ApplycalResults(applied) if inputs.flagsum: result.summaries = [stats_before, stats_after] # Flagging stats by spw and antenna if inputs.flagsum and inputs.flagdetailedsum: ms = self.inputs.context.observing_run.get_ms(inputs.vis) spws = ms.get_spectral_windows() spwids = [spw.id for spw in spws] # Note should intent be set to inputs.intent as shown below or is there # a reason not to do this. # fields = ms.get_fields(intent=inputs.intent) fields = ms.get_fields(intent='BANDPASS,PHASE,AMPLITUDE,CHECK,TARGET') if 'VLA' in self.inputs.context.project_summary.telescope: calfields = ms.get_fields(intent='AMPLITUDE,PHASE,BANDPASS') alltargetfields = ms.get_fields(intent='TARGET') fields = calfields Nplots = (len(alltargetfields) // 30) + 1 targetfields = [field for field in alltargetfields[0:len(alltargetfields):Nplots]] fields.extend(targetfields) flagsummary = collections.OrderedDict() flagkwargs = [] for field in fields: flagsummary[field.name.strip('"')] = {} for spwid in spwids: flagline = "spw='" + str(spwid) + "' fieldcnt=True mode='summary' name='AntSpw" + str(spwid).zfill(3) flagkwargs.append(flagline) # 20170406 TN # Tweak flagkwargs (default is do nothing) flagkwargs = self._tweak_flagkwargs(flagkwargs) # BRK note - Added kwarg fieldcnt based on Justo's changes, July 2015 # Need to have fieldcnt in the flagline above flaggingjob = casa_tasks.flagdata(vis=inputs.vis, mode='list', inpfile=flagkwargs, flagbackup=False) flagdicts = self._executor.execute(flaggingjob) # BRK note - for Justo's new flagging scheme, need to rearrrange # the dictionary keys in the order of field, spw report, antenna, with added name and type keys # on the third dictionary level. # Set into single dictionary report (single spw) if only one dict returned if len(flagkwargs) == 1: flagdictssingle = flagdicts flagdicts = {} flagdicts['report0'] = flagdictssingle for key in flagdicts: # report level fieldnames = list(flagdicts[key].keys()) fieldnames.remove('name') fieldnames.remove('type') for fieldname in fieldnames: try: flagsummary[fieldname][key] = flagdicts[key][fieldname] # TODO: review if this relies on order of keys. spwid = list(flagdicts[key][fieldname]['spw'].keys())[0] flagsummary[fieldname][key]['name'] = 'AntSpw' + str(spwid).zfill(3) + 'Field_' + str(fieldname) flagsummary[fieldname][key]['type'] = 'summary' except Exception as ex: LOG.debug("No flags to report for " + str(key) + str(ex)) result.flagsummary = flagsummary return result
[docs] def match_fields_scans(self): m = self.inputs.context.observing_run.get_ms(self.inputs.vis) # Count the number of groups intentslist = [list(scan.intents) for scan in m.get_scans(scan_intent='PHASE,TARGET')] intents = [] from itertools import groupby groups = [list(j) for i, j in groupby(intents)] primarygroups = [list(set(group))[0] for group in groups] ngroups = primarygroups.count('TARGET') targetscans = [] groups = [] phase1 = False phase2 = False prev_phase2 = False TargetGroup = collections.namedtuple("TargetGroup", "phase1 targetscans phase2") scans = m.get_scans(scan_intent='PHASE,TARGET') for idx, scan in enumerate(scans): fieldset = scan.fields fieldobj = list(fieldset)[0] fieldid = fieldobj.id if 'PHASE' in list(scan.intents): if not phase1 or (phase1 and bool(targetscans) == False): phase1 = scan else: phase2 = scan prev_phase2 = phase2 targetscans.append(scan) elif 'TARGET' in list(scan.intents): if not phase1 or prev_phase2: phase1 = prev_phase2 targetscans.append(scan) # Check for consecutive time ranges # see if this scan is the last one in the relevant scan list # or see if we have a phase2 # if so, end the group if phase2 or idx == len(scans)-1: groups.append(TargetGroup(phase1, targetscans, phase2)) phase1 = False phase2 = False targetscans = [] applycalgroups = collections.defaultdict(list) for idx, group in enumerate(groups): if group.phase2: fieldset = group.phase1.fields fieldobj = list(fieldset)[0] phase1fieldid = fieldobj.id fieldset = group.phase2.fields fieldobj = list(fieldset)[0] phase2fieldid = fieldobj.id gainfield = ','.join([str(phase1fieldid), str(phase2fieldid)]) else: fieldset = group.phase1.fields fieldobj = list(fieldset)[0] gainfield = str(fieldobj.id) targetscans = [str(targetscan.id) for targetscan in group.targetscans] try: gainfieldkey = gainfield.split(',')[1] except Exception as ex: LOG.debug(str(ex)) gainfieldkey = gainfield.split(',')[0] applycalgroups[gainfieldkey].extend(targetscans) for gainfield, scanlist in applycalgroups.items(): print("Applycal Group") print("\tGainfield.... {}".format(gainfield)) print("\t\tScanList... {}".format(','.join(scanlist))) print(" ") return applycalgroups