import collections
import copy
import functools
import operator
import warnings
from typing import List
import numpy
import scipy.optimize
import pipeline.infrastructure.logging as logging
from . import mswrapper
LOG = logging.get_logger(__name__)
# used to associate ant/pol metadata with the amp/phase best fits
AntennaFit = collections.namedtuple('AntennaFit', 'ant pol amp phase')
# struct to hold best fit parameters for a linear model
LinearFitParameters = collections.namedtuple('LinearFitParameters', 'slope intercept')
# describes an outlier data selection with why it's an outlier, and by how much
Outlier = collections.namedtuple('Outlier', 'vis intent scan spw ant pol num_sigma reason')
# simple 2-tuple to hold a value and the uncertainty in that value
ValueAndUncertainty = collections.namedtuple('FitAndError', 'value unc')
# nsigma thresholds for marking deviating fits as an outlier
AMPLITUDE_SLOPE_THRESHOLD = 10
AMPLITUDE_INTERCEPT_THRESHOLD = 10
PHASE_SLOPE_THRESHOLD = 6.5
PHASE_INTERCEPT_THRESHOLD = 8.4
[docs]def score_all_scans(ms, intent: str, flag_all: bool = False) -> List[Outlier]:
"""
Calculate best fits for amplitude vs frequency and phase vs frequency
for time-averaged visibilities, score each fit by comparison against a
reference value, and return outliers.
Outliers are returned as a list of Outlier objects.
By default, outliers are measured against a PWG-defined threshold for
each fit type. Set flag_all to True to make classify all fits as outliers.
This is useful when testing QA plugin score roll-up functionality.
:param ms: MeasurementSet to process
:param intent: data intent to process
:param flag_all: (optional) True if all fits should be classified as
outliers
:return: outliers that deviate from a reference fit
"""
outliers = []
for scan in sorted(ms.get_scans(scan_intent=intent), key=operator.attrgetter('id')):
spws = sorted([spw for spw in scan.spws if spw.type in ('FDM', 'TDM')],
key=operator.attrgetter('id'))
for spw in spws:
LOG.info('Applycal QA analysis: processing {} scan {} spw {}'.format(ms.basename, scan.id, spw.id))
wrapper = mswrapper.MSWrapper.create_from_ms(ms.name, scan=scan.id, spw=spw.id)
fits = get_best_fits_per_ant(wrapper)
outlier_fn = functools.partial(Outlier,
vis={ms.basename, },
intent={intent, },
spw={spw.id, },
scan={scan.id, })
outliers.extend(score_all(fits, outlier_fn, flag_all))
return outliers
[docs]def calc_vk(wrapper):
"""
Return a NumPy array containing time-averaged visibilities for each
baseline in the input MSWrapper.
:param wrapper: MSWrapper to process
:return:
"""
# find indices of all antennas
antenna1 = set(wrapper['antenna1'])
antenna2 = set(wrapper['antenna2'])
all_antennas = antenna1.union(antenna2)
# Sigma is a function of sqrt(num_antennas - 1). Calculate and cache this value now.
root_num_antennas = numpy.sqrt(len(all_antennas) - 1)
# columns in this list are omitted from V_k
excluded_columns = ['antenna1', 'antenna2', 'corrected_phase', 'flag']
# create a new dtype that adds 'antenna' and 'sigma' columns, filtering out columns we want to omit
column_names = [c for c in wrapper.data.dtype.names if c not in excluded_columns]
result_dtype = [mswrapper.get_dtype(wrapper.data, c) for c in column_names]
result_dtype.insert(0, ('antenna', numpy.int32))
result_dtype.append(('sigma', wrapper['corrected_data'].dtype, wrapper['corrected_data'].shape[1:]))
# get 1D array of channel frequencies and include its definition in the dtype
chan_freq = wrapper.freq_axis['chan_freq']
chan_freq = chan_freq.swapaxes(0, 1)[0]
result_dtype.append(('chan_freq', chan_freq.dtype, chan_freq.shape))
# get 1D array of channel widths and include the column in the dtype
resolution = wrapper.freq_axis['resolution']
resolution = resolution.swapaxes(0, 1)[0]
result_dtype.append(('resolution', resolution.dtype, resolution.shape))
# new numpy array to hold visibilities V
V = numpy.ma.empty((0,), dtype=result_dtype)
for k in all_antennas:
# create new row to hold all data for this antenna
V_k = numpy.ma.empty((1,), dtype=V.data.dtype)
# add antenna and channel frequencies to the row for this antenna
V_k['antenna'] = k
V_k['chan_freq'] = chan_freq
V_k['resolution'] = resolution
# Equation 2: sigma_{k}(nu_{i}) = std(V_{jk}(nu_{i}))_{j} / sqrt(n_{ant})
# select all visibilities created using this antenna.
V_jk = wrapper.xor_filter(antenna1=k, antenna2=k)
sigma_k_real = V_jk['corrected_data'].real.std(axis=0) / root_num_antennas
sigma_k_imag = V_jk['corrected_data'].imag.std(axis=0) / root_num_antennas
V_k['sigma'] = sigma_k_real + 1j * sigma_k_imag
# add the remaining columns
for col in column_names:
V_k[col] = V_jk[col].mean(axis=0)
V = numpy.ma.concatenate((V, V_k), axis=0)
return V
[docs]def get_best_fits_per_ant(wrapper):
"""
Calculate and return the best amp/phase vs freq fits for data in the input
MSWrapper.
This function calculates an independent best fit per polarisation per
antenna, returning a list of AntennaFit objects that characterise the fit
parameters and fit uncertainties per fit.
:param wrapper: MSWrapper to process
:return: a list of AntennaFit objects
"""
V_k = calc_vk(wrapper)
corrected_data = V_k['corrected_data']
sigma = V_k['sigma']
num_antennas, num_pols, num_chans = corrected_data.shape
all_fits = []
for ant in range(num_antennas):
frequencies = V_k['chan_freq'][ant]
bandwidth = numpy.ma.max(frequencies) - numpy.ma.min(frequencies)
band_midpoint = (numpy.ma.max(frequencies) + numpy.ma.min(frequencies)) / 2.0
frequency_scale = 1.0 / bandwidth
amp_model_fn = get_linear_function(band_midpoint, frequency_scale)
ang_model_fn = get_angular_linear_function(band_midpoint, frequency_scale)
for pol in range(num_pols):
visibilities = corrected_data[ant, pol, :]
ta_sigma = sigma[ant, pol, :]
if visibilities.count() == 0:
LOG.info('Could not fit ant {} pol {}: data is completely flagged'.format(ant, pol))
continue
try:
(amp_fit, amp_err) = get_amp_fit(amp_model_fn, frequencies, visibilities, ta_sigma)
amplitude_fit = to_linear_fit_parameters(amp_fit, amp_err)
except TypeError:
# Antenna probably flagged..
LOG.info('Could not fit amplitude vs frequency for ant {} pol {}'.format(ant, pol))
continue
try:
(phase_fit, phase_err) = get_phase_fit(amp_model_fn, ang_model_fn, frequencies, visibilities, ta_sigma)
phase_fit = to_linear_fit_parameters(phase_fit, phase_err)
except TypeError:
# Antenna probably flagged..
LOG.info('Could not fit phase vs frequency for ant {} pol {}'.format(ant, pol))
continue
fit_obj = AntennaFit(ant=ant, pol=pol, amp=amplitude_fit, phase=phase_fit)
all_fits.append(fit_obj)
return all_fits
[docs]def score_all(all_fits, outlier_fn, flag_all: bool = False):
"""
Compare and score the calculated best fits based on how they deviate from
a reference value.
Setting the test argument flag_all to True sets all fits as outliers. This
is useful for testing the QA score roll-up and summary functions in the QA
plugin.
:param all_fits:
:param outlier_fn:
:param flag_all: True if all fits should be classed as outliers
:return:
"""
amp_slope_threshold = 0.0 if flag_all else AMPLITUDE_SLOPE_THRESHOLD
amp_intercept_threshold = 0.0 if flag_all else AMPLITUDE_INTERCEPT_THRESHOLD
phase_slope_threshold = 0.0 if flag_all else PHASE_SLOPE_THRESHOLD
phase_intercept_threshold = 0.0 if flag_all else PHASE_INTERCEPT_THRESHOLD
outliers = []
outliers.extend(score_amp_slope(all_fits, outlier_fn, amp_slope_threshold))
outliers.extend(score_amp_intercept(all_fits, outlier_fn, amp_intercept_threshold))
outliers.extend(score_phase_slope(all_fits, outlier_fn, phase_slope_threshold))
outliers.extend(score_phase_intercept(all_fits, outlier_fn, phase_intercept_threshold))
return outliers
[docs]def score_amp_slope(all_fits, outlier_fn, sigma_threshold):
"""
For all amplitude vs frequency fits, score the slope of the fit against
the slope of the median best fit, marking fits that deviate by
sigma_threshold from the median dispersion as outliers. Identified
outliers are returned as a list of Outlier object returned by the
outlier_fn.
The outlier_fn argument should be a function that returns Outlier objects.
In practice, this function should be a partially-applied Outlier
constructor that requires a more required arguments to be
supplied for an Outlier instance to be created.
:param all_fits: list of all AntennaFit best fit parameters for all metrics
:param outlier_fn: a function returning Outlier objects
:param sigma_threshold: the nsigma threshold to be considered an outlier
:return: list of Outlier objects
"""
return score_X_vs_freq_fits(all_fits, 'amp.slope', get_median_fit, outlier_fn, sigma_threshold)
[docs]def score_amp_intercept(all_fits, outlier_fn, sigma_threshold):
"""
Score the intercept of the best fit against the intercept of the median
best fit, marking fits that deviate by sigma_threshold from the median
dispersion as outliers. Identified outliers are returned as a list of
Outlier object returned by the outlier_fn.
The outlier_fn argument should be a function that returns Outlier objects.
In practice, this function should be a partially-applied Outlier
constructor that requires a more required arguments to be
supplied for an Outlier instance to be created.
:param all_fits: list of all AntennaFit best fit parameters for all metrics
:param outlier_fn: a function returning Outlier objects
:param sigma_threshold: the nsigma threshold to be considered an outlier
:return: list of Outlier objects
"""
return score_X_vs_freq_fits(all_fits, 'amp.intercept', get_median_fit, outlier_fn, sigma_threshold)
[docs]def score_phase_slope(all_fits, outlier_fn, sigma_threshold):
"""
For all phase vs frequency fits, score the slope of the fit against the
slope of the median best fit, marking fits that deviate by sigma_threshold
from the median dispersion as outliers. Identified outliers are returned
as a list of Outlier object returned by the outlier_fn.
The outlier_fn argument should be a function that returns Outlier objects.
In practice, this function should be a partially-applied Outlier
constructor that requires a more required arguments to be
supplied for an Outlier instance to be created.
:param all_fits: list of all AntennaFit best fit parameters for all metrics
:param outlier_fn: a function returning Outlier objects
:param sigma_threshold: the nsigma threshold to be considered an outlier
:return: list of Outlier objects
"""
# phase score is calculated as deviation from zero
return score_X_vs_freq_fits(all_fits, 'phase.slope', PHASE_REF_FN, outlier_fn, sigma_threshold)
[docs]def score_phase_intercept(all_fits, outlier_fn, sigma_threshold):
"""
For all phase vs frequency fits, score the intercept of the fit against
the intercept of the median best fit, marking fits that deviate by
sigma_threshold from the median dispersion as outliers. Identified
outliers are returned as a list of Outlier object returned by the
outlier_fn.
The outlier_fn argument should be a function that returns Outlier objects.
In practice, this function should be a partially-applied Outlier
constructor that requires a more required arguments to be
supplied for an Outlier instance to be created.
:param all_fits: list of all AntennaFit best fit parameters for all metrics
:param outlier_fn: a function returning Outlier objects
:param sigma_threshold: the nsigma threshold to be considered an outlier
:return: list of Outlier objects
"""
# phase score is calculated as deviation from zero
return score_X_vs_freq_fits(all_fits, 'phase.intercept', PHASE_REF_FN, outlier_fn, sigma_threshold)
[docs]def score_X_vs_freq_fits(all_fits, attr, ref_value_fn, outlier_fn, sigma_threshold):
"""
Score a set of best fits, comparing the fits identified by the 'attr'
attribute against a reference value calculated by the ref_value_fn,
marking outliers that deviate by more than sigma_threshold from this
reference value as outliers, to be returned as Outlier objects created by
the outlier_fn.
:param all_fits: a list of fit parameters
:param attr: identifier of the fits to consider, e.g., 'amp.slope'
:param ref_value_fn: a function that takes a list of fits and returns a
value to be used as a reference value in fit comparisons
:param outlier_fn: a function returning Outlier objects
:param sigma_threshold: the nsigma threshold to be considered an outlier
:return: list of Outlier objects
"""
# convert linear fit metadata to a reason that identifies this fit as
# originating from this metric in a wider context, e.g., from 'amp.slope'
# to 'amp_vs_freq.slope'
y_axis, fit_parameter = attr.split('.')
reason = f'{y_axis}_vs_freq.{fit_parameter}'
outlier_fn = functools.partial(outlier_fn, reason={reason, })
accessor = operator.attrgetter(attr)
return score_fits(all_fits, ref_value_fn, accessor, outlier_fn, sigma_threshold)
[docs]def score_fits(all_fits, reference_value_fn, accessor, outlier_fn, sigma_threshold):
"""
Score a list of best fit parameters against a reference value, identifying
outliers as fits that deviate by more than sigma_threshold * std dev from
the reference value.
:param all_fits: list of AntennaFits
:param reference_value_fn: function that returns a reference
ValueAndUncertainty from a list of these objects
:param accessor: function that returns one LinearFitParameters from an
AntennaFit
:param outlier_fn: function that returns an Outlier instance
:param sigma_threshold: threshold nsigma deviation for comparisons
:return: list of Outliers
"""
outliers = []
pols = {f.pol for f in all_fits}
for pol in pols:
pol_fits = [f for f in all_fits if f.pol == pol]
# get reference val. Usually median, could be zero for phase
reference_val, reference_sigma = reference_value_fn(pol_fits, accessor)
for fit in pol_fits:
ant = fit.ant
unc = accessor(fit).unc
value = accessor(fit).value
this_sigma = numpy.sqrt(reference_sigma ** 2 + unc ** 2)
num_sigma = numpy.abs((value - reference_val) / this_sigma)
if num_sigma > sigma_threshold:
outlier = outlier_fn(ant={ant, }, pol={pol, }, num_sigma=num_sigma)
outliers.append(outlier)
return outliers
[docs]def to_linear_fit_parameters(fit, err):
"""
Convert tuples from the best fit evaluation into a LinearFitParameters
namedtuple.
:param fit: 2-tuple of (slope, intercept) best-fit parameters
:param err: 2-tuple of uncertainty in (slope, intercept) parameters
:return:
"""
return LinearFitParameters(slope=ValueAndUncertainty(value=fit[0], unc=err[0]),
intercept=ValueAndUncertainty(value=fit[1], unc=err[1]))
[docs]def get_amp_fit(amp_model_fn, frequencies, visibilities, sigma):
"""
Fit a linear amplitude vs frequency model to a set of time-averaged
visibilities.
:param amp_model_fn: the amplitude linear model to optimise
:param frequencies: numpy array of channel frequencies
:param visibilities: numpy array of time-averaged visibilities
:param sigma: numpy array of uncertainies in time-averaged visibilities
:return: tuple of best fit params, uncertainty tuple
"""
# calculate amplitude and phase from visibility, inc. std. deviations for each
amp = numpy.ma.abs(visibilities)
# angle of complex argument, in radians
sigma_amp = numpy.ma.sqrt((visibilities.real * sigma.real) ** 2 + (visibilities.imag * sigma.imag) ** 2) / amp
sigma_phase = numpy.ma.sqrt((visibilities.imag * sigma.real) ** 2 + (visibilities.real * sigma.imag) ** 2) / (
amp ** 2)
# curve_fit doesn't handle MaskedArrays, so mask out all bad data and
# convert to standard NumPy arrays
mask = numpy.ma.all([amp.mask, sigma_amp <= 0, sigma_phase <= 0], axis=0)
trimmed_frequencies = frequencies[~mask]
trimmed_amp = amp.data[~mask]
trimmed_sigma_amp = sigma_amp.data[~mask]
Cinit = numpy.ma.median(trimmed_amp)
amp_fit, amp_cov = scipy.optimize.curve_fit(amp_model_fn, trimmed_frequencies, trimmed_amp,
p0=[0.0, Cinit], sigma=trimmed_sigma_amp, absolute_sigma=True)
amp_err = numpy.sqrt(numpy.diag(amp_cov))
return amp_fit, amp_err
[docs]def get_phase_fit(amp_model_fn, ang_model_fn, frequencies, visibilities, sigma):
"""
Fit a linear model for phase vs frequency to a set of time-averaged
visibilities.
:param amp_model_fn: model function for amplitude
:param ang_model_fn: model function for phase angle
:param frequencies: numpy array of channel frequencies
:param visibilities: numpy array of time-averaged visibilities
:param sigma: numpy array of uncertainies in time-averaged visibilities
:return: tuple of best fit params, uncertainty tuple
"""
# calculate amplitude and phase from visibility, inc. std. deviations for each
amp = numpy.ma.abs(visibilities)
phase = numpy.ma.angle(visibilities)
zeroamp = (amp.data <= 0.0)
amp.mask[zeroamp] = True
phase.mask[zeroamp] = True
sigma_amp = numpy.ma.sqrt((visibilities.real * sigma.real) ** 2 + (
visibilities.imag * sigma.imag) ** 2) / amp
sigma_phase = numpy.ma.sqrt((visibilities.imag * sigma.real) ** 2 + (
visibilities.real * sigma.imag) ** 2) / (amp ** 2)
# curve_fit doesn't handle MaskedArrays, so mask out all bad data and
# convert to standard NumPy arrays
mask = numpy.ma.all([amp.mask, sigma_amp <= 0, sigma_phase <= 0], axis=0)
trimmed_frequencies = frequencies[~mask]
trimmed_phase = phase.data[~mask]
trimmed_sigma_phase = sigma_phase.data[~mask]
phi_init = numpy.ma.median(trimmed_phase)
# normalise visibilities by amplitude to fit linear angular model
normalised_visibilities = numpy.ma.divide(visibilities, amp)
normalised_sigma = numpy.ma.divide(sigma, amp)
ang_fit_res = fit_angular_model(ang_model_fn, frequencies, normalised_visibilities, normalised_sigma)
# Detrend phases using fit
detrend_model = ang_model_fn(frequencies, -ang_fit_res['x'][0], -ang_fit_res['x'][1])
detrend_data = normalised_visibilities * detrend_model
detrend_phase = numpy.ma.angle(detrend_data)[~mask]
# Refit phases to obtain errors from the same curve_fit method
zerophasefit, phasecov = scipy.optimize.curve_fit(amp_model_fn, trimmed_frequencies, detrend_phase,
p0=[0.0, phi_init - ang_fit_res['x'][1]],
sigma=trimmed_sigma_phase, absolute_sigma=True)
# Final result is detrending model + new fit (close to zero)
phase_fit = ang_fit_res['x'] + zerophasefit
phase_err = numpy.sqrt(numpy.diag(phasecov))
return phase_fit, phase_err
[docs]def get_linear_function(midpoint, x_scale):
"""
Return a scaled linear function (a function of slope and intercept).
:param midpoint:
:param x_scale:
:return:
"""
def f(x, slope, intercept):
return numpy.ma.multiply(
numpy.ma.multiply(slope, numpy.ma.subtract(x, midpoint)),
x_scale
) + intercept
return f
[docs]def get_angular_linear_function(midpoint, x_scale):
"""
Angular linear model to fit phases only.
:param midpoint:
:param x_scale:
:return:
"""
linear_fn = get_linear_function(midpoint, x_scale)
def f(x, slope, intercept):
return numpy.ma.exp(1j * linear_fn(x, slope, intercept))
return f
[docs]def get_chi2_ang_model(angular_model, nu, omega, phi, angdata, angsigma):
m = angular_model(nu, omega, phi)
diff = angdata - m
aux = (numpy.square(diff.real / angsigma.real) + numpy.square(diff.imag / angsigma.imag))[~angdata.mask]
return float(numpy.sum(aux.real))
[docs]def fit_angular_model(angular_model, nu, angdata, angsigma):
f_aux = lambda omega_phi: get_chi2_ang_model(angular_model, nu, omega_phi[0], omega_phi[1], angdata, angsigma)
angle = numpy.ma.angle(angdata[~angdata.mask])
with warnings.catch_warnings():
warnings.simplefilter('ignore', numpy.ComplexWarning)
phi_init = numpy.ma.median(angle)
fitres = scipy.optimize.minimize(f_aux, numpy.array([0.0, phi_init]), method='L-BFGS-B')
return fitres
[docs]def robust_stats(a):
"""
Return median and estimate standard deviation
of numpy array A using median statistics
:param a:
:return:
"""
# correction factor for obtaining sigma from MAD
madfactor = 1.482602218505602
n = len(a)
# Fitting parameters for sample size correction factor b(n)
if n % 2 == 0:
alpha = 1.32
beta = -1.5
else:
alpha = 1.32
beta = -0.9
bn = 1.0 - 1.0 / (alpha * n + beta)
mu = numpy.median(a)
sigma = (1.0 / bn) * madfactor * numpy.median(numpy.abs(a - mu))
return mu, sigma
[docs]def consolidate_data_selections(outliers):
"""
Consolidate a list of Outliers into a smaller set of equivalent Outliers
by consolidating their data selection arguments.
This function works by merging Outliers that have the same list of
reasons.
:param outliers: an iterable of Outliers
:return: an equivalent consolidated list of Outliers
"""
# dict mapping an reason hash to the reason itself:
hash_to_reason = {}
# dict mapping from object hash to corresponding list of Outliers
reason_hash_to_outliers = collections.defaultdict(list)
# create our maps of hashes, which we need to test for overlapping data
# selections
for outlier in outliers:
# create a tuple, as lists are not hashable
reason_hash = tuple([hash(reason) for reason in outlier.reason])
reason_hash_to_outliers[reason_hash].append(outlier)
if reason_hash not in hash_to_reason:
hash_to_reason[reason_hash] = outlier.reason
# dict that maps holds accepted data selections and their reasons
accepted = {}
for reason_hash, outliers in reason_hash_to_outliers.items():
# assemble the other outliers which we will compare for conflicts
other_outliers = []
for v in [v for k, v in reason_hash_to_outliers.items() if k != reason_hash]:
other_outliers.extend(v)
for outlier_to_merge in outliers:
if reason_hash not in accepted:
# first time round for this outlier, therefore it can always
# be added as there will be nothing to merge
accepted[reason_hash] = [copy.deepcopy(outlier_to_merge)]
continue
for existing_outlier in accepted[reason_hash]:
proposed_outlier = copy.deepcopy(existing_outlier)
proposed_outlier.vis.update(outlier_to_merge.vis)
proposed_outlier.intent.update(outlier_to_merge.intent)
proposed_outlier.spw.update(outlier_to_merge.spw)
proposed_outlier.scan.update(outlier_to_merge.scan)
proposed_outlier.ant.update(outlier_to_merge.ant)
proposed_outlier.pol.update(outlier_to_merge.pol)
# if the merged outlier does not conflict with any of the
# explicitly registered outliers that require a different
# reason, then it is safe to add the merged outlier and
# discard the unmerged data selection
if not any((data_selection_contains(proposed_outlier, other) for other in other_outliers)):
LOG.trace('No conflicting outlier detected')
LOG.trace('Accepting merged outlier: {!s}'.format(proposed_outlier))
LOG.trace('Discarding unmerged outlier: {!s}'.format(outlier_to_merge))
accepted[reason_hash] = [proposed_outlier]
break
else:
# we get here if all of the proposed outliers conflict with
# the outlier in hand. In this case, it should be added as it
# stands, completely unaltered.
LOG.trace('Merged outlier conflicts with other registrations')
LOG.trace('Abandoning proposed outlier: {!s}'.format(proposed_outlier))
LOG.trace('Appending new unmerged outlier: {!s}'.format(outlier_to_merge))
unmergeable = outlier_to_merge
accepted[reason_hash].append(unmergeable)
# dict values are lists, which we need to flatten into a single list
result = []
for l in accepted.values():
result.extend(l)
return result
[docs]def data_selection_contains(proposed, ds_args):
"""
Return True if one data selection is contained within another.
:param proposed: data selection 1
:param ds_args: data selection 2
:return: True if data selection 2 is contained within data selection 1
"""
return all([not proposed.vis.isdisjoint(ds_args.vis),
not proposed.intent.isdisjoint(ds_args.intent),
not proposed.scan.isdisjoint(ds_args.scan),
not proposed.spw.isdisjoint(ds_args.spw),
not proposed.ant.isdisjoint(ds_args.ant),
not proposed.pol.isdisjoint(ds_args.pol)])
# The function used to create the reference value for phase vs frequency best fits
# Select this to compare fits against the median
PHASE_REF_FN = get_median_fit
# Select this to compare fits against zero
# PHASE_REF_FN = lambda _, __: ValueAndUncertainty(0, 0)