Source code for pipeline.hifv.tasks.flagging.flagbaddeformatters

import collections

import pipeline.infrastructure as infrastructure
import pipeline.infrastructure.basetask as basetask
import pipeline.infrastructure.vdp as vdp
from pipeline.hifv.heuristics import getBCalStatistics
# from pipeline.hifv.heuristics import getCalFlaggedSoln
from pipeline.infrastructure import casa_tasks
from pipeline.infrastructure import task_registry

LOG = infrastructure.get_logger(__name__)


[docs]class FlagBadDeformattersInputs(vdp.StandardInputs): doflagundernspwlimit = vdp.VisDependentProperty(default=False) def __init__(self, context, vis=None, doflagundernspwlimit=None): super(FlagBadDeformattersInputs, self).__init__() self.context = context self.vis = vis self.doflagundernspwlimit = doflagundernspwlimit
[docs]class FlagBadDeformattersResults(basetask.Results): def __init__(self, jobs=None, result_amp=None, result_phase=None, amp_collection=None, phase_collection=None, num_antennas=None): if amp_collection is None: amp_collection = collections.defaultdict(list) if phase_collection is None: phase_collection = collections.defaultdict(list) if jobs is None: jobs = [] if result_amp is None: result_amp = [] if result_phase is None: result_phase = [] super(FlagBadDeformattersResults, self).__init__() self.jobs = jobs self.result_amp = result_amp self.result_phase = result_phase self.amp_collection = amp_collection self.phase_collection = phase_collection self.num_antennas = num_antennas def __repr__(self): s = 'Bad deformatter results:\n' for job in self.jobs: s += '%s performed. Statistics to follow?' % str(job) return s
[docs]@task_registry.set_equivalent_casa_task('hifv_flagbaddef') class FlagBadDeformatters(basetask.StandardTaskTemplate): Inputs = FlagBadDeformattersInputs
[docs] def prepare(self): # Setting control parameters as method arguments method_args = {'testq': 'amp', # Which quantity to test? ['amp','phase','real','imag'] 'tstat': 'rat', # Which stat to use?['min','max','mean','var']or'rat'=min/max or 'diff'=max-min 'doprintall': True, # Print detailed flagging stats 'testlimit': 0.15, # Limit for test (flag values under/over this limit) 'testunder': True, 'nspwlimit': 4, # Number of spw per baseband to trigger flagging entire baseband 'doflagundernspwlimit': self.inputs.doflagundernspwlimit, # Flag individual spws when below nspwlimit 'doflagemptyspws': False, # Flag data for spws with no unflagged channel solutions in any poln? 'calBPtablename': self.inputs.context.results[-1].read()[0].bpcaltable, # Define the table 'flagreason': 'bad_deformatters_amp or RFI'} # Define the REASON given for the flags (result_amp, amp_collection, num_antennas) = self._do_flag_baddeformatters(**method_args) method_args = {'testq': 'phase', 'tstat': 'diff', 'doprintall': True, 'testlimit': 50, 'testunder': False, 'nspwlimit': 4, 'doflagundernspwlimit': self.inputs.doflagundernspwlimit, 'doflagemptyspws': False, 'calBPtablename': self.inputs.context.results[-1].read()[0].bpcaltable, 'flagreason': 'bad_deformatters_phase or RFI'} (result_phase, phase_collection, num_antennas) = self._do_flag_baddeformatters(**method_args) return FlagBadDeformattersResults(result_amp=result_amp, result_phase=result_phase, amp_collection=amp_collection, phase_collection=phase_collection, num_antennas=num_antennas)
def _do_flag_baddeformatters(self, testq=None, tstat=None, doprintall=True, testlimit=None, testunder=True, nspwlimit=4, doflagundernspwlimit=True, doflagemptyspws=False, calBPtablename=None, flagreason=None): """Determine bad deformatters in the MS and flag them Looks for bandpass solutions that have small ratio of min/max amplitudes """ # Which quantity to test? ['amp','phase','real','imag'] # testq = 'amp' # Which stat to use? ['min','max','mean','var'] or 'rat'=min/max or 'diff'=max-min # tstat = 'rat' # Print detailed flagging stats # doprintall = True # From original script... # Limit for test (flag values under/over this limit) # testlimit = 0.15 # testunder = True # Number of spw per baseband to trigger flagging entire baseband # nspwlimit = 4 # Flag individual spws when below nspwlimit # doflagundernspwlimit = True # Flag data for spws with no unflagged channel solutions in any poln? # doflagemptyspws = False m = self.inputs.context.observing_run.get_ms(self.inputs.vis) num_antennas = len(m.antennas) startdate = m.start_time['m0']['value'] LOG.info("Start date for flag bad deformatters is: " + str(startdate)) if startdate <= 56062.7: doflagdata = False else: doflagdata = True # Define the table to run this on # calBPtablename ='testBPcal.b' # Define the REASON given for the flags # flagreason = 'bad_deformatters_amp or RFI' LOG.info("Will test on quantity: "+testq) LOG.info("Will test using statistic: "+tstat) if testunder: LOG.info("Will flag values under limit = "+str(testlimit)) else: LOG.info("Will flag values over limit = "+str(testlimit)) LOG.info("Will identify basebands with more than "+str(nspwlimit)+" bad spw") if doflagundernspwlimit: LOG.info("Will identify individual spw when less than "+str(nspwlimit)+" bad spw") if doflagemptyspws: LOG.info("Will identify spw with no unflagged channels") LOG.info("Will use flag REASON = "+flagreason) if doflagdata: LOG.info("Will flag data based on what we found") else: LOG.info("Will NOT flag data based on what we found") calBPstatresult = getBCalStatistics(calBPtablename) flaglist = [] extflaglist = [] weblogflagdict = collections.defaultdict(list) for iant in calBPstatresult['antband']: antName = calBPstatresult['antDict'][iant] badspwlist = [] flaggedspwlist = [] for rrx in calBPstatresult['antband'][iant]: for bband in calBPstatresult['antband'][iant][rrx]: # List of spw in this baseband spwl = calBPstatresult['rxBasebandDict'][rrx][bband] nbadspws = 0 badspws = [] flaggedspws = [] if len(spwl) > 0: if doprintall: LOG.info(' Ant %s (%s) %s %s processing spws=%s' % (str(iant), antName, rrx, bband, str(spwl))) for ispw in spwl: testvalid = False if ispw in calBPstatresult['antspw'][iant]: for poln in calBPstatresult['antspw'][iant][ispw]: # Get stats of this ant/spw/poln nbp = calBPstatresult['antspw'][iant][ispw][poln]['inner']['number'] if nbp > 0: if tstat == 'rat': bpmax = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq]['max'] bpmin = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq]['min'] if bpmax == 0.0: tval = 0.0 else: tval = bpmin/bpmax elif tstat == 'diff': bpmax = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq]['max'] bpmin = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq]['min'] tval = bpmax-bpmin else: # simple test on quantity tval = calBPstatresult['antspw'][iant][ispw][poln]['inner'][testq][tstat] if not testvalid: testval = tval testvalid = True elif testunder: if tval < testval: testval = tval else: if tval > testval: testval = tval # Test on extrema of the polarizations for this ant/spw if not testvalid: # these have no unflagged channels in any poln flaggedspws.append(ispw) else: if (testunder and testval < testlimit) or (not testunder and testval > testlimit): nbadspws += 1 badspws.append(ispw) if doprintall: LOG.info(' Found Ant %s (%s) %s %s spw=%s %s %s=%6.4f' % (str(iant), antName, rrx, bband, str(ispw), testq, tstat, testval)) else: # this spw is missing from this antenna/rx if doprintall: LOG.info(' Ant %s (%s) %s %s spw=%s missing solution' % (str(iant), antName, rrx, bband, str(ispw))) # Test to see if this baseband should be entirely flagged if nbadspws > 0 and nbadspws >= nspwlimit: # Flag all spw in this baseband bbspws = calBPstatresult['rxBasebandDict'][rrx][bband] badspwlist.extend(bbspws) LOG.info('Ant %s (%s) %s %s bad baseband spws=%s' % (str(iant), antName, rrx, bband, str(bbspws))) elif nbadspws > 0 and doflagundernspwlimit: # Flag spws individually badspwlist.extend(badspws) LOG.info('Ant %s (%s) %s %s bad spws=%s' % (str(iant), antName, rrx, bband, str(badspws))) if len(flaggedspws) > 0: flaggedspwlist.extend(flaggedspws) LOG.info('Ant %s (%s) %s %s no unflagged solutions spws=%s ' % (str(iant), antName, rrx, bband, str(flaggedspws))) if len(badspwlist) > 0: spwstr = '' for ispw in badspwlist: if spwstr == '': spwstr = str(ispw) else: spwstr += ','+str(ispw) # # reastr = 'bad_deformatters' reastr = flagreason # Add entry for this antenna # flagstr = "mode='manual' antenna='"+str(iant)+"' spw='"+spwstr+"' reason='"+reastr+"'" # Use name for flagging flagstr = "mode='manual' antenna='"+antName+"' spw='"+spwstr+"'" flaglist.append(flagstr) weblogflagdict[antName].append(spwstr) if doflagemptyspws and len(flaggedspwlist) > 0: spwstr = '' for ispw in flaggedspwlist: if spwstr == '': spwstr = str(ispw) else: spwstr += ','+str(ispw) # # Add entry for this antenna reastr = 'no_unflagged_solutions' # flagstr = "mode='manual' antenna='"+str(iant)+"' spw='"+spwstr+"' reason='"+reastr+"'" # Use name for flagging flagstr = "mode='manual' antenna='"+antName+"' spw='"+spwstr+"'" extflaglist.append(flagstr) weblogflagdict[antName].append(spwstr) # Get basebands matched with spws. spws is a single element list with a single csv string tempDict = {} for antNamekey, spws in weblogflagdict.items(): basebands = [] for spwstr in spws[0].split(','): spw = m.get_spectral_window(spwstr) basebands.append(spw.name.split('#')[0] + ' ' + spw.name.split('#')[1]) basebands = list(set(basebands)) # Unique basebands tempDict[antNamekey] = {'spws': spws, 'basebands': basebands} weblogflagdict = tempDict nflagcmds = len(flaglist) + len(extflaglist) if nflagcmds < 1: LOG.info("No bad basebands/spws found") else: LOG.info("Possible bad basebands/spws found:") for flagstr in flaglist: LOG.info(" "+flagstr) if len(extflaglist) > 0: LOG.info(" ") for flagstr in extflaglist: LOG.info(" "+flagstr) flaglist.extend(extflaglist) if doflagdata: LOG.info("Flagging bad deformatters in the ms...") task_args = {'vis': self.inputs.vis, 'mode': 'list', 'action': 'apply', 'inpfile': flaglist, 'savepars': True, 'flagbackup': True} job = casa_tasks.flagdata(**task_args) self._executor.execute(job) # get the total fraction of data flagged for all antennas # flagging_stats = getCalFlaggedSoln(calBPtablename) # total = 0 # flagged = 0 # for antenna in flagging_stats['ant']: # for pol in flagging_stats['ant'][antenna]: # flagged += flagging_stats['ant'][antenna][pol]['flagged'] # total += flagging_stats['ant'][antenna][pol]['total'] # fraction_flagged = flagged / total # LOG.info('Flagged ({}) Total ({}) Fraction ({})'.format(flagged, total, fraction_flagged)) return flaglist, weblogflagdict, num_antennas # If the flag commands are not executed. return [], collections.defaultdict(list), num_antennas
[docs] def analyse(self, results): return results