import collections
import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.callibrary as callibrary
import pipeline.infrastructure.vdp as vdp
from pipeline.h.tasks import applycal as happlycal
from pipeline.hif.tasks import applycal
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry
LOG = infrastructure.get_logger(__name__)
[docs]@task_registry.set_equivalent_casa_task('hifv_applycals')
class Applycals(applycal.IFApplycal):
Inputs = ApplycalsInputs
# Note this is a temporary workaround
antenna_to_apply = '*&*'
[docs] def prepare(self):
# Run applycal
applycal_results = self._do_applycal()
return applycal_results
[docs] def analyse(self, results):
return results
def _do_applycal(self):
result = self.applycal_run()
return result
[docs] def applycal_run(self):
inputs = self.inputs
# Get the target data selection for this task as a CalTo object
calto = callibrary.get_calto_from_inputs(inputs)
# Now get the calibration state for that CalTo data selection. The
# returned dictionary of CalTo:CalFroms specifies the calibrations to
# be applied and the data selection to apply them to.
#
# Note that no 'ignore' argument is given to get_calstate
# (specifically, we don't say ignore=['calwt'] like many other tasks)
# as applycal is a task that can handle calwt and so different values
# of calwt should in this case result in different tasks.
calstate = inputs.context.callibrary.get_calstate(calto)
merged = calstate.merged()
# run a flagdata job to find the flagged state before applycal
if inputs.flagsum:
# 20170406 TN
# flagdata task arguments are indirectly given so that sd applycal task is
# able to edit them
summary_args = dict(vis=inputs.vis, mode='summary')
flagdata_summary_job = casa_tasks.flagdata(**self._get_flagsum_arg(summary_args))
stats_before = self._executor.execute(flagdata_summary_job)
stats_before['name'] = 'before'
if inputs.gainmap:
applycalgroups = self.match_fields_scans()
else:
applycalgroups = collections.defaultdict(list)
applycalgroups['1'] = ['']
jobs = []
for gainfield, scanlist in applycalgroups.items():
for calto, calfroms in merged.items():
# if there's nothing to apply for this data selection, continue
if not calfroms:
continue
# arrange a calibration job for the unique data selection
inputs.spw = calto.spw
inputs.field = calto.field
inputs.intent = calto.intent
args = inputs.to_casa_args()
# Do this a different way ?
args.pop('flagsum', None) # Flagsum is not a CASA applycal task argument
args.pop('flagdetailedsum', None) # Flagdetailedsum is not a CASA applycal task argument
# set the on-the-fly calibration state for the data selection.
calapp = callibrary.CalApplication(calto, calfroms)
# Note this is a temporary workaround ###
args['antenna'] = self.antenna_to_apply
# Note this is a temporary workaround ###
args['gaintable'] = calapp.gaintable
args['gainfield'] = calapp.gainfield
args['spwmap'] = calapp.spwmap
args['interp'] = calapp.interp
args['calwt'] = calapp.calwt
args['applymode'] = inputs.applymode
if inputs.gainmap:
# Determine what tables gainfield should used with if mode='gainmap'
for i, table in enumerate(args['gaintable']):
if 'finalampgaincal' in table or 'finalphasegaincal' in table:
args['interp'][i] = 'linear'
args['gainfield'][i] = gainfield
# args['interp'] = ['', '', '', '', 'linear,linearflag', '', 'linear', 'linear']
# args['gainfield'] = ['','','','','','', gainfield, gainfield]
args['scan'] = ','.join(scanlist)
LOG.info("Using gainfield {!s} and scan={!s}".format(gainfield, ','.join(scanlist)))
args.pop('gainmap', None)
jobs.append(casa_tasks.applycal(**args))
if inputs.gainmap:
for calto, calfroms in merged.items():
# if there's nothing to apply for this data selection, continue
if not calfroms:
continue
# arrange a calibration job for the unique data selection
inputs.spw = calto.spw
inputs.field = calto.field
inputs.intent = calto.intent
args = inputs.to_casa_args()
args['intent'] = 'CALIBRATE*'
# Do this a different way ?
args.pop('flagsum', None) # Flagsum is not a CASA applycal task argument
args.pop('flagdetailedsum', None) # Flagdetailedsum is not a CASA applycal task argument
# set the on-the-fly calibration state for the data selection.
calapp = callibrary.CalApplication(calto, calfroms)
# Note this is a temporary workaround ###
args['antenna'] = self.antenna_to_apply
# Note this is a temporary workaround ###
args['gaintable'] = calapp.gaintable
args['gainfield'] = calapp.gainfield
args['spwmap'] = calapp.spwmap
args['interp'] = calapp.interp
args['calwt'] = calapp.calwt
args['applymode'] = inputs.applymode
args.pop('gainmap', None)
jobs.append(casa_tasks.applycal(**args))
# execute the jobs
for job in jobs:
self._executor.execute(job)
# run a final flagdata job to get the flagging statistics after
# application of the potentially flagged caltables
if inputs.flagsum:
stats_after = self._executor.execute(flagdata_summary_job)
stats_after['name'] = 'applycal'
applied = [callibrary.CalApplication(calto, calfroms)
for calto, calfroms in merged.items()]
result = happlycal.ApplycalResults(applied)
if inputs.flagsum:
result.summaries = [stats_before, stats_after]
# Flagging stats by spw and antenna
if inputs.flagsum and inputs.flagdetailedsum:
ms = self.inputs.context.observing_run.get_ms(inputs.vis)
spws = ms.get_spectral_windows()
spwids = [spw.id for spw in spws]
# Note should intent be set to inputs.intent as shown below or is there
# a reason not to do this.
# fields = ms.get_fields(intent=inputs.intent)
fields = ms.get_fields(intent='BANDPASS,PHASE,AMPLITUDE,CHECK,TARGET')
if 'VLA' in self.inputs.context.project_summary.telescope:
calfields = ms.get_fields(intent='AMPLITUDE,PHASE,BANDPASS')
alltargetfields = ms.get_fields(intent='TARGET')
fields = calfields
Nplots = (len(alltargetfields) // 30) + 1
targetfields = [field for field in alltargetfields[0:len(alltargetfields):Nplots]]
fields.extend(targetfields)
flagsummary = collections.OrderedDict()
flagkwargs = []
for field in fields:
flagsummary[field.name.strip('"')] = {}
for spwid in spwids:
flagline = "spw='" + str(spwid) + "' fieldcnt=True mode='summary' name='AntSpw" + str(spwid).zfill(3)
flagkwargs.append(flagline)
# 20170406 TN
# Tweak flagkwargs (default is do nothing)
flagkwargs = self._tweak_flagkwargs(flagkwargs)
# BRK note - Added kwarg fieldcnt based on Justo's changes, July 2015
# Need to have fieldcnt in the flagline above
flaggingjob = casa_tasks.flagdata(vis=inputs.vis, mode='list', inpfile=flagkwargs, flagbackup=False)
flagdicts = self._executor.execute(flaggingjob)
# BRK note - for Justo's new flagging scheme, need to rearrrange
# the dictionary keys in the order of field, spw report, antenna, with added name and type keys
# on the third dictionary level.
# Set into single dictionary report (single spw) if only one dict returned
if len(flagkwargs) == 1:
flagdictssingle = flagdicts
flagdicts = {}
flagdicts['report0'] = flagdictssingle
for key in flagdicts: # report level
fieldnames = list(flagdicts[key].keys())
fieldnames.remove('name')
fieldnames.remove('type')
for fieldname in fieldnames:
try:
flagsummary[fieldname][key] = flagdicts[key][fieldname]
# TODO: review if this relies on order of keys.
spwid = list(flagdicts[key][fieldname]['spw'].keys())[0]
flagsummary[fieldname][key]['name'] = 'AntSpw' + str(spwid).zfill(3) + 'Field_' + str(fieldname)
flagsummary[fieldname][key]['type'] = 'summary'
except Exception as ex:
LOG.debug("No flags to report for " + str(key) + str(ex))
result.flagsummary = flagsummary
return result
[docs] def match_fields_scans(self):
m = self.inputs.context.observing_run.get_ms(self.inputs.vis)
# Count the number of groups
intentslist = [list(scan.intents) for scan in m.get_scans(scan_intent='PHASE,TARGET')]
intents = []
from itertools import groupby
groups = [list(j) for i, j in groupby(intents)]
primarygroups = [list(set(group))[0] for group in groups]
ngroups = primarygroups.count('TARGET')
targetscans = []
groups = []
phase1 = False
phase2 = False
prev_phase2 = False
TargetGroup = collections.namedtuple("TargetGroup", "phase1 targetscans phase2")
scans = m.get_scans(scan_intent='PHASE,TARGET')
for idx, scan in enumerate(scans):
fieldset = scan.fields
fieldobj = list(fieldset)[0]
fieldid = fieldobj.id
if 'PHASE' in list(scan.intents):
if not phase1 or (phase1 and bool(targetscans) == False):
phase1 = scan
else:
phase2 = scan
prev_phase2 = phase2
targetscans.append(scan)
elif 'TARGET' in list(scan.intents):
if not phase1 or prev_phase2:
phase1 = prev_phase2
targetscans.append(scan)
# Check for consecutive time ranges
# see if this scan is the last one in the relevant scan list
# or see if we have a phase2
# if so, end the group
if phase2 or idx == len(scans)-1:
groups.append(TargetGroup(phase1, targetscans, phase2))
phase1 = False
phase2 = False
targetscans = []
applycalgroups = collections.defaultdict(list)
for idx, group in enumerate(groups):
if group.phase2:
fieldset = group.phase1.fields
fieldobj = list(fieldset)[0]
phase1fieldid = fieldobj.id
fieldset = group.phase2.fields
fieldobj = list(fieldset)[0]
phase2fieldid = fieldobj.id
gainfield = ','.join([str(phase1fieldid), str(phase2fieldid)])
else:
fieldset = group.phase1.fields
fieldobj = list(fieldset)[0]
gainfield = str(fieldobj.id)
targetscans = [str(targetscan.id) for targetscan in group.targetscans]
try:
gainfieldkey = gainfield.split(',')[1]
except Exception as ex:
LOG.debug(str(ex))
gainfieldkey = gainfield.split(',')[0]
applycalgroups[gainfieldkey].extend(targetscans)
for gainfield, scanlist in applycalgroups.items():
print("Applycal Group")
print("\tGainfield.... {}".format(gainfield))
print("\t\tScanList... {}".format(','.join(scanlist)))
print(" ")
return applycalgroups