pipeline.infrastructure.utils package

Submodules

pipeline.infrastructure.utils.conversion module

The conversion module contains utility functions.

The conversion module contains utility functions that convert between data types and assist in formatting objects as strings for presentation to the user.

pipeline.infrastructure.utils.conversion.ant_arg_to_id(ms_path: str, ant_arg: Union[str, int], all_antennas) → List[int][source]

Convert a string to the corresponding antenna IDs.

Args

ms_path: A path to the measurement set. ant_arg: A antenna selection in CASA format. all_antennas: All antenna domain objects for use when CASA msselect is disabled.

Returns

A list of antenna IDs.

pipeline.infrastructure.utils.conversion.commafy(l: Sequence[str], quotes: bool = True, multi_prefix: str = '', separator: str = ', ', conjunction: str = 'and')str[source]

Convert the string list into the textual description.

Example: >>> commafy([‘a’,’b’,’c’]) “‘a’, ‘b’ and ‘c’”

Parameters
  • l – Python string list.

  • quotes – If quote is True, ‘l’ arg elements are enclosed in quotes by each.

  • multi_prefix – If the ‘l’ arg has three or more elements, the ‘multi_prefix’ attach to the head.

  • separator – The ‘separator’ arg is used as separator instead of ‘,’.

  • conjunction – The ‘conjunction’ arg is used as conjunction instead of ‘and’.

Returns

The textual description of the given list.

pipeline.infrastructure.utils.conversion.dequote(s: str)str[source]

Remove any kind of quotes from a string to facilitate comparisons.

Parameters

s – A string.

Returns

String removed any kind of quotes.

pipeline.infrastructure.utils.conversion.field_arg_to_id(ms_path: str, field_arg: Union[str, int], all_fields) → List[int][source]

Convert a string to the corresponding field IDs.

Parameters
  • ms_path – A path to the measurement set.

  • field_arg – A field selection in CASA format.

  • all_fields – All Field objects, for use when CASA msselect is not used.

Returns

A list of field IDs.

pipeline.infrastructure.utils.conversion.flatten(l: Sequence[Any]) → Iterator[Any][source]

Flatten a list of lists into a single list without pipelineaq.QAScore.

Example: >>> obj = flatten([1,2,[3,4,[5,6]],7]) >>> obj.__next__() 1 >>> obj.__next__() 2 >>> obj.__next__() 3

>>> list(flatten([1,2,['c',4,['e',6]],7]))
[1, 2, 'c', 4, 'e', 6, 7]
Parameters

l – A list with list or any object.

Yields

Single list.

pipeline.infrastructure.utils.conversion.format_datetime(dt: datetime.datetime, dp: int = 0)str[source]

Convert a datetime to a formatted string representation.

Convert a Python datetime object into a string representation, including microseconds to the requested precision.

Parameters
  • dt – Python datetime.

  • dp – A number of decimal places for microseconds (0=do not show).

Returns

Formatted string representation of datetime.

pipeline.infrastructure.utils.conversion.format_timedelta(td: datetime.timedelta, dp: int = 0)str[source]

Convert a timedelta to a formatted string representation.

Convert a Python timedelta object into a string representation, including microseconds to the requested precision.

Args

td: A timedelta object. dp: A number of decimal places for microseconds (0=do not show).

The number should be natural number with 0.

Returns

Formatted string representation of timedelta.

pipeline.infrastructure.utils.conversion.get_epoch_as_datetime(epoch: numbers.Number)datetime.datetime[source]

Convert a CASA epoch measure into a Python datetime.

Parameters

epoch – CASA epoch measure.

Returns

The equivalent Python datetime.

pipeline.infrastructure.utils.conversion.mjd_seconds_to_datetime(mjd_secs: Sequence[numbers.Number]) → List[datetime.datetime][source]

Convert list of MJD seconds to a list of equivalent datetime objects.

Convert the input list of elapsed seconds since MJD epoch to the equivalent Python datetime objects.

Parameters

mjd_secs – list of elapsed seconds since MJD epoch.

Returns

List of equivalent Python datetime objects.

pipeline.infrastructure.utils.conversion.range_to_list(arg: str) → List[int][source]

Expand a numeric range expressed in CASA syntax to the list of integer.

Expand a numeric range expressed in CASA syntax to the equivalent Python list of integers.

Example: >>> range_to_list(‘1~5,7~9’) [1, 2, 3, 4, 5, 7, 8, 9]

Parameters

arg – The numeric range expressed in CASA syntax.

Returns

The equivalent Python list of integers.

pipeline.infrastructure.utils.conversion.safe_split(fields: str) → List[str][source]

Split a string containing field names into a list.

Split a string containing field names into a list, taking account of field names within quotes.

Parameters

fields – A string containing field names.

Returns

A list, taking account of field names within quotes.

pipeline.infrastructure.utils.conversion.spw_arg_to_id(ms_path: str, spw_arg: Union[str, int], all_spws) → List[Tuple[int, int, int, int]][source]

Convert a string to spectral window IDs and channels.

Parameters
  • ms_path – A path to the measurement set.

  • spw_arg – A spw selection in CASA format.

  • all_spws – List of all SpectralWindow objects, for use when CASA msselect is not used.

Returns

A list of (spw, chan_start, chan_end, step) lists.

pipeline.infrastructure.utils.conversion.to_CASA_intent(ms, intents: str)str[source]

Convert pipeline intents back to the equivalent intents recorded in the measurement set.

Example: > to_CASA_intent(ms, ‘PHASE,BANDPASS’) ‘CALIBRATE_PHASE_ON_SOURCE,CALIBRATE_BANDPASS_ON_SOURCE’

Parameters
  • ms – MeasurementSet object.

  • intents – pipeline intents to convert.

Returns

The CASA intents recorded.

pipeline.infrastructure.utils.conversion.to_pipeline_intent(ms, intents: str)str[source]

Convert CASA intents to pipeline intents.

Parameters
  • ms – MeasurementSet object.

  • intents – CASA intents to convert.

Returns

The pipeline intents.

pipeline.infrastructure.utils.conversion_test module

pipeline.infrastructure.utils.conversion_test.test_commafy(inp, kwargs, expected)[source]

Test commafy()

pipeline.infrastructure.utils.conversion_test.test_dequote(inp, expected)[source]

Test dequote()

pipeline.infrastructure.utils.conversion_test.test_flatten(inp, expected)[source]

Test flatten()

pipeline.infrastructure.utils.conversion_test.test_flatten_empty()[source]

Test flatten() with empty input

pipeline.infrastructure.utils.conversion_test.test_format_datetime(inp, kwargs, expected)[source]

Test format_datetime()

pipeline.infrastructure.utils.conversion_test.test_format_datetime_raises_exception_too_high_precision()[source]

Test format_datetime() when requesting too high precision

pipeline.infrastructure.utils.conversion_test.test_format_timedelta(inp, kwargs, expected)[source]

Test format_timedelta()

pipeline.infrastructure.utils.conversion_test.test_format_timedelta_raises_exception_too_high_precision()[source]

Test format_timedelta() when requesting too high precision

pipeline.infrastructure.utils.conversion_test.test_mjd_seconds_to_datetime(inp, expected)[source]

Test mjd_seconds_to_datetime()

pipeline.infrastructure.utils.conversion_test.test_range_to_list(inp, expected)[source]

Test range_to_list()

pipeline.infrastructure.utils.conversion_test.test_safe_split(inp, expected)[source]

Test safe_split()

pipeline.infrastructure.utils.conversion_test.test_unix_seconds_to_datetime(inp, expected)[source]

Test unix_seconds_to_datetime()

pipeline.infrastructure.utils.diagnostics module

The diagnostics module contains utility functions used to help profile the pipeline.

pipeline.infrastructure.utils.diagnostics.enable_fd_logs(interval_secs=60)[source]

Log file descriptors to the CASA log every n seconds.

Parameters

interval_secs – logging cadence in seconds (default=60)

Returns

pipeline.infrastructure.utils.diagnostics.enable_memstats()[source]

pipeline.infrastructure.utils.framework module

The framework module contains:

  1. utility functions used by the pipeline framework;

  2. utility functions used by pipeline tasks to help process framework objects (Results, CalLibrary objects, etc.).

pipeline.infrastructure.utils.framework.collect_properties(instance, ignore=None)[source]

Return the public properties of an object as a dictionary

pipeline.infrastructure.utils.framework.contains_single_dish(context)[source]

Return True if the context contains single-dish data.

Parameters

context – the pipeline context

Returns

True if SD data is present

pipeline.infrastructure.utils.framework.flatten_dict(d, join=<built-in function add>, lift=<function <lambda>>)[source]
pipeline.infrastructure.utils.framework.gen_hash(o)[source]

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

pipeline.infrastructure.utils.framework.get_calfroms(context, vis, caltypes=None)[source]

Get the CalFroms of the requested type from the callibrary.

pipeline.infrastructure.utils.framework.get_origin_input_arg(calapp, attr)[source]

Get a single-valued task input argument from a CalApp.

If more than one value is present, for instance, asking for solint when the originating jobs have different solint arguments, an assertion error will be raised.

Parameters
Returns

pipeline.infrastructure.utils.framework.get_qascores(result, lo=None, hi=None)[source]
pipeline.infrastructure.utils.framework.get_tracebacks(result)[source]

Get the tracebacks for the result, from any failures that may have occurred during the task.

Parameters

result – a result or result list.

Returns

list of tracebacks as strings.

pipeline.infrastructure.utils.framework.is_top_level_task()[source]

Return True if the callee if executing as part of a top-level task.

pipeline.infrastructure.utils.framework.merge_jobs(jobs, task, merge=None, ignore=None)[source]

Merge jobs that are identical apart from the arguments named in ignore. These jobs will be recreated with

Identical tasks are identified by creating a hash of the dictionary of task keyword arguments, ignoring keywords specified in the ‘ignore’ argument. Jobs with the same hash can be merged; this is done by appending the spw argument of job X to the spw argument of memoed job Y, whereafter job X can be discarded.

Parameters
  • jobs (a list of JobRequest) – the job requests to merge

  • task (a reference to a function on :class:`~pipeline.infrastructure.jobrequest.casa_tasks’) – the CASA task to recreate

  • ignore (an iterable containing strings) – the task arguments to ignore during hash creation

Return type

a list of JobRequest

pipeline.infrastructure.utils.framework.mkdir_p(path)[source]

Emulate mkdir -p functionality.

pipeline.infrastructure.utils.framework.pickle_copy(original)[source]
pipeline.infrastructure.utils.framework.pickle_load(fileobj)[source]
pipeline.infrastructure.utils.framework.plotms_iterate(jobs_and_wrappers, iteraxis=None)[source]
pipeline.infrastructure.utils.framework.task_depth()[source]

Get the number of executing tasks currently on the stack. If the depth is 1, the calling function is the top-level task.

pipeline.infrastructure.utils.imaging module

The imaging module contains utility functions used by the imaging tasks.

TODO These utility functions should migrate to hif.tasks.common

pipeline.infrastructure.utils.imaging.chan_selection_to_frequencies(img: str, selection: str, unit: str = 'GHz') → Union[List[float], List[str]][source]

Convert channel selection to frequency tuples for a given CASA cube.

Parameters
  • img – CASA cube name

  • selection – Channel selection string using CASA selection syntax

  • unit – Frequency unit

Returns

List of pairs of frequency values (float) in the desired units

pipeline.infrastructure.utils.imaging.equal_to_n_digits(x: float, y: float, numdigits: int = 7)bool[source]

Approximate equality check up to a given number of digits.

Parameters
  • x – First floating point number

  • y – Second floating point number

  • numdigits – Number of digits to check

Returns

Boolean

pipeline.infrastructure.utils.imaging.freq_selection_to_channels(img: str, selection: str) → Union[List[int], List[str]][source]

Convert frequency selection to channel tuples for a given CASA cube.

Parameters
  • img – CASA cube name

  • selection – Frequency selection string using CASA syntax

Returns

List of pairs of channel values (int)

pipeline.infrastructure.utils.imaging.intersect_ranges(ranges: List[Tuple[Union[float, int]]]) → Tuple[Union[float, int]][source]

Compute intersection of ranges.

Parameters

ranges – List of tuples defining (frequency) intervals

Returns

Tuple of two numbers defining the intersection

Return type

intersect_range

pipeline.infrastructure.utils.imaging.intersect_ranges_by_weight(ranges: List[Tuple[Union[float, int]]], delta: float, threshold: float) → Tuple[float][source]

Compute intersection of ranges through weight arrays and a threshold.

Parameters
  • ranges – List of tuples defining frequency intervals

  • delta – Frequency step to be used for the intersection

  • threshold – Threshold to be used for the intersection

Returns

Tuple of two numbers defining the intersection

Return type

intersect_range

pipeline.infrastructure.utils.imaging.merge_ranges(ranges: List[Tuple[Union[float, int]]]) → Generator[List[Tuple[float]], None, None][source]

Merge overlapping and adjacent ranges and yield the merged ranges in order. The argument must be an iterable of pairs (start, stop).

Parameters

ranges – List of tuples of two numbers defining ranges

Returns

Generator yielding tuples of merged ranges

>>> list(merge_ranges([(5,7), (3,5), (-1,3)]))
[(-1, 7)]
>>> list(merge_ranges([(5,6), (3,4), (1,2)]))
[(1, 2), (3, 4), (5, 6)]
>>> list(merge_ranges([]))
[]
  1. Gareth Rees 02/2013

pipeline.infrastructure.utils.imaging.set_nested_dict(dct: Dict, keys: Tuple[Any], value: Any)None[source]

Set a hierarchy of dictionaries with given keys and value for the lowest level key.

>>> d = {}
>>> set_nested_dict(d, ('key1', 'key2', 'key3'), 1)
>>> print(d)
{'key1': {'key2': {'key3': 1}}}
Parameters
  • dct – Any dictionary

  • keys – List of keys to build hierarchy

  • value – Value for lowest level key

Returns

None. The dictionary is modified in place.

pipeline.infrastructure.utils.imaging.spw_intersect(spw_range: List[float], line_regions: List[List[float]]) → List[List[float]][source]

This utility function takes a frequency range (as numbers with arbitrary but common units) and computes the intersection with a list of frequency ranges defining the regions of spectral lines. It returns the remaining ranges excluding the line frequency ranges.

Parameters
  • spw_range – List of two numbers defining the spw frequency range

  • line_regions – List of lists of pairs of numbers defining frequency ranges to be excluded

Returns

List of lists of pairs of numbers defining the remaining frequency ranges

pipeline.infrastructure.utils.imaging.update_beams_dict(dct: Dict, udct: Dict)None[source]

Update a beams dictionary. All generic solutions tried so far did not do the job. So this method assumes an explicit dictionary structure of [‘<field name’][‘<intent>’][<spwids>]: {<beam>}.

Parameters
  • dct – Beams dictionary

  • udct – Beams update dictionary

Returns

None. The main dictionary is modified in place.

pipeline.infrastructure.utils.imaging.update_sens_dict(dct: Dict, udct: Dict)None[source]

Update a sensitivity dictionary. All generic solutions tried so far did not do the job. So this method assumes an explicit dictionary structure of [‘<MS name>’][‘<field name’][‘<intent>’][<spw>]: {<sensitivity result>}.

Parameters
  • dct – Sensitivities dictionary

  • udct – Sensitivities update dictionary

Returns

None. The main dictionary is modified in place.

pipeline.infrastructure.utils.imaging_test module

pipeline.infrastructure.utils.imaging_test.test__get_cube_freq_axis(img, freq_axis)[source]

Test _get_cube_freq_axis()

pipeline.infrastructure.utils.imaging_test.test_chan_selection_to_frequencies(img, selection, unit, frequency_ranges)[source]

Test chan_selection_to_frequencies()

pipeline.infrastructure.utils.imaging_test.test_equal_to_n_digits(x, y, numdigits, result)[source]

Test equal_to_n_digits()

pipeline.infrastructure.utils.imaging_test.test_freq_selection_to_channels(img, selection, channel_ranges)[source]

Test freq_selection_to_channels()

pipeline.infrastructure.utils.imaging_test.test_intersect_ranges(ranges, intersect_range)[source]

Test intersect_ranges()

pipeline.infrastructure.utils.imaging_test.test_intersect_ranges_by_weight(ranges, delta, threshold, intersect_range)[source]

Test intersect_ranges_by_weight()

pipeline.infrastructure.utils.imaging_test.test_merge_ranges(ranges, merged_ranges)[source]

Test merge_ranges()

pipeline.infrastructure.utils.imaging_test.test_set_nested_dict(dct, keys, value, rdct)[source]

Test set_nested_dict()

pipeline.infrastructure.utils.imaging_test.test_spw_intersect(spw_range, line_regions, expected)[source]

Test spw_intersect()

This utility function takes a frequency range (as unitless integers or doubles) and computes the intersection with a list of frequency ranges denoting the regions of spectral lines. It returns the remaining ranges excluding the line frequency ranges.

pipeline.infrastructure.utils.imaging_test.test_update_beams_dict(dct, udct, rdct)[source]

Test update_beams_dict()

pipeline.infrastructure.utils.imaging_test.test_update_sens_dict(dct, udct, rdct)[source]

Test update_sens_dict()

pipeline.infrastructure.utils.math module

pipeline.infrastructure.utils.math.round_half_up(value: Union[int, str], precision: float = 0)float[source]

Provide the Python2 rounding behavior

The behaviour of the “round” built-in changed from Python 2 to Python 3. In Python 2, round() was rounding to the nearest integer away from 0.

For example, [round(a) for a in [-2.5, -1.5, -0.5, 0.5, 1.5, 2.5]] == [-3.0, -2.0, -1.0, 1.0, 2.0, 3.0]

In Python 3, round() become “Banker’s rounding”, rounding to the nearest even integer, following the IEEE 754 standard for floating-point arithmetic.

For example, >>> [round(a) for a in [-2.5, -1.5, -0.5, 0.5, 1.5, 2.5]] [-2, -2, 0, 0, 2, 2]

Parameters
  • value – Un-rounded value

  • precision – Precision of un-rounded value to consider when rounding

Returns

rounded value to nearest integer away from 0

pipeline.infrastructure.utils.math_test module

pipeline.infrastructure.utils.math_test.test_round_half_up(unrounded, precision, expected)[source]

Test round_half_up()

This utility function takes an un-rounded scalar value and rounds it to the nearest value away from zero, using ‘precision’ to signify the decimal place to round.

pipeline.infrastructure.utils.math_test.test_simple()[source]

pipeline.infrastructure.utils.positioncorrection module

Utilities used for correcting image center coordinates.

pipeline.infrastructure.utils.positioncorrection.do_wide_field_pos_cor(fitsname: str, date_time: Optional[Dict] = None, obs_long: Optional[Dict[str, Union[str, float]]] = None, obs_lat: Optional[Dict[str, Union[str, float]]] = None)None[source]

Applies mean wide field position correction to FITS WCS in place.

Apply a mean correction to the FITS WCS reference position as a function of mean hour angle of observation and mean declination (see PIPE-587, PIPE-700, SRDP-412, and VLASS Memo #14).

The correction is intended for VLASS-QL images. It is performed as part of the hifv_exportvlassdata task call in the VLASS-QL pipeline run. It can also be executed outside of the pipeline.

CRVAL1, CUNIT1, CRVAL2, CUNIT2 and HISTORY keywords are updated in place in the input FITS image header.

Parameters
  • fitsname – name (and path) of FITS file to be processed.

  • date_time – Mean date and time of observation in casa_tools.measure.epoch format, if None use DATE-OBS FITS header value. e.g. {‘m0’: {‘unit’: ‘d’, ‘value’: 58089.83550347222},

    ‘refer’: ‘UTC’, ‘type’: ‘epoch’}

  • obs_long – Geographic longitude of observatory, casa_tools.quanta.quantity format. If None, then use VLA coordinate. e.g. {‘value’: -107.6, ‘unit’: ‘deg’}.

  • obs_lat – Geographic latitude of observatory, casa_tools.quanta.quantity format. If None, then use VLA coordinate. e.g. {‘value’: 34.1, ‘unit’: ‘deg’}.

Example

file = “VLASS1.1.ql.T19t20.J155950+333000.10.2048.v1.I.iter1.image.pbcor.tt0.subim.fits” # Mean time of observation datetime = pipeline.infrastructure.casa_tools.measures.epoch(‘utc’, ‘2017-12-02T20:03:07.500’) # VLA coordinates obslong = {‘unit’:’deg’,’value’:-107.6} obslat = {‘unit’:’deg’,’value’:34.1} # Correct reference positions in fits header do_wide_field_pos_cor(file, date_time=datetime, obs_long=obslong, obs_lat=obslat)

pipeline.infrastructure.utils.positioncorrection_test module

pipeline.infrastructure.utils.positioncorrection_test.test_calc_wide_field_pos_cor(ra: Dict, dec: Dict, obs_long: Dict, obs_lat: Dict, date_time: Dict, offset_expected: Tuple[Dict, Dict], epsilon: float = 1e-09)[source]

Test calc_wide_field_pos_cor()

This utility function tests the mathematical correctness of wide field correction function with edge cases. The tested quantity are the RA and Dec offsets.

pipeline.infrastructure.utils.positioncorrection_test.test_do_wide_field_corr(fitsname: str, obs_long: Dict[str, Union[str, float]], obs_lat: Dict[str, Union[str, float]], url: Optional[str], expected: Tuple[Dict, Dict], epsilon: float = 1e-09)[source]

Test do_wide_field_corr()

This utility function downloads a FITS image and applies wide field position correction to the image reference coordinates (CRVAL1 and CRVAL2). The tested quantities are the corrected RA and Dec values in the FITS header.

If url is not provided, or not available, then assume file already exists in current folder.

The default tolerance (epsilon) value is equivalent to about 0.01 milliarcs.

pipeline.infrastructure.utils.ppr module

The ppr module contains utility functions useful to PPR processing functions.

TODO There is a lot of duplication between executeppr and vlaexecuteppr. Can this module be eliminated if a common PPR base class was created?

pipeline.infrastructure.utils.ppr.check_ppr(pprfile)[source]

Check PPR to make sure all tasks exist in the CASA task dictionary.

Compares the <Command> elements of the PPR to the keys of the CASA task dictionary. This is useful as a quick sanity check before executing the pipeline.

Parameters

pprfile – A path to the PPR file (string). e.g. ‘PPR_VLAT003.xml’ or ‘mydata/working/PPR_VLAT003.xml’

Returns

False on error. True on success.

pipeline.infrastructure.utils.ppr.write_errorexit_file(path, root, extension)[source]

pipeline.infrastructure.utils.sorting module

The sorting module contains utility functions used to sort pipeline input and output.

pipeline.infrastructure.utils.sorting.natural_sort(input_list)[source]

Sort a list in natural order, eg.

>>> natural_sort(["session10", "session10a", "session9", "session1"])
['session1', 'session9', 'session10', 'session10a']
pipeline.infrastructure.utils.sorting.natural_sort_key(x, _nsre=re.compile('([0-9]+)'))[source]

Key to order in natural order with the sort function

pipeline.infrastructure.utils.sorting.numeric_sort(input_list)[source]

Sort a list numerically, eg.

>>> numeric_sort(['9,11,13,15', '11,13', '9'])
['9', '9,11,13,15', '11,13']
pipeline.infrastructure.utils.sorting.numeric_sort_key(s, _nsre=re.compile('([0-9]+)'))[source]

Key to order in numeric order with the sort function. Split a string by numbers.

pipeline.infrastructure.utils.sorting_test module

Tests for the sorting.py module.

pipeline.infrastructure.utils.sorting_test.test_natural_sort(input_list, expected)[source]

Test natural_sort()

Natural sort orders a list of strings taking into account the numerical values included in the string. It is case insensitive in terms of sorting with respect to the characters.

pipeline.infrastructure.utils.sorting_test.test_natural_sort_key(input_list, expected)[source]

Test natural_sort_key()

This test the sorting key defined to obtain a natural sort order. It splits a string into a list of elements defined by being sets of digits or other types of characters. The digits are converted to integers and the strings to lower case to allow a natural sorting when using this list as the sort key in the sorted function.

In its current implementation natural_sort returns an empty string element at the beginning/end if the input string starts/ends with a number.

pipeline.infrastructure.utils.sorting_test.test_numeric_sort(input_list, expected)[source]

Test numeric_sort()

Numeric sort orders a list of strings taking into account the numerical values included in the string.

pipeline.infrastructure.utils.sorting_test.test_numeric_sort_key(input_list, expected)[source]

Test numeric_sort_key()

This test the sorting key defined to obtain a natural sort order. The main difference with natural_sort_key() is that it is case sensitive for string components.

In its current implementation natural_sort returns an empty string element at the beginning/end if the input string starts/ends with a number.

pipeline.infrastructure.utils.utils module

The utils module contains general-purpose uncategorised utility functions and classes.

pipeline.infrastructure.utils.utils.approx_equal(x: float, y: float, tol: float = 1e-15)bool[source]

Return True if two numbers are equal within the given tolerance.

This utility function returns True if two numbers are equal within the given tolerance.

Examples: >>> approx_equal(1.0e-2, 1.2e-2, 1e-2) True >>> approx_equal(1.0e-2, 1.2e-2, 1e-3) False

pipeline.infrastructure.utils.utils.are_equal(a: Union[List, numpy.ndarray], b: Union[List, numpy.ndarray])bool[source]

Return True if the contents of the given arrays are equal.

This utility function check the equivalence of array like objects. Two arrays are equal if they have the same number of elements and elements of the same index are equal.

Examples: >>> are_equal([1, 2, 3], [1, 2, 3]) True >>> are_equal([1, 2, 3], [1, 2, 3, 4]) False

pipeline.infrastructure.utils.utils.dict_merge(a: Dict, b: Union[Dict, any]) → Dict[source]

Recursively merge dictionaries.

This utility function recursively merges dictionaries. If second argument (b) is a dictionary, then a copy of first argument (dictionary a) is created and the elements of b are merged into the new dictionary. Otherwise return argument b.

This utility function check the equivalence of array like objects. Two arrays are equal if they have the same number of elements and elements of the same index are equal.

Examples: >>> dict_merge({‘a’: {‘b’: 1}}, {‘c’: 2}) {‘a’: {‘b’: 1}, ‘c’: 2}

pipeline.infrastructure.utils.utils.find_ranges(data: Union[str, List[int]])str[source]

Identify numeric ranges in string or list.

This utility function takes a string or a list of integers (e.g. spectral window lists) and returns a string containing identified ranges.

Examples: >>> find_ranges([1,2,3]) ‘1~3’ >>> find_ranges(‘1,2,3,5~12’) ‘1~3,5~12’

pipeline.infrastructure.utils.utils.flagged_intervals(vec: Union[List, numpy.ndarray]) → List[source]

Idendity isnads of ones in input array or list.

This utility function finds islands of ones in array or list provided in argument. Used to find contiguous flagged channels in a given spw. Returns a list of tuples with the start and end channels.

Examples: >>> flagged_intervals([0, 1, 0, 1, 1]) [(1, 1), (3, 4)]

pipeline.infrastructure.utils.utils.get_casa_quantity(value: Union[None, Dict, str, float, int]) → Dict[source]

Wrapper around quanta.quantity() that handles None input.

Starting with CASA 6, quanta.quantity() no longer accepts None as input. This utility function handles None values when calling CASA quanta.quantity() tool method.

Returns

A CASA quanta.quantity (dictionary)

Examples: >>> get_casa_quantity(None) {‘unit’: ‘’, ‘value’: 0.0} >>> get_casa_quantity(‘10klambda’) {‘unit’: ‘klambda’, ‘value’: 10.0}

pipeline.infrastructure.utils.utils.get_field_identifiers(ms) → Dict[source]

Maps numeric field IDs to field names.

Get a dict of numeric field ID to unambiguous field identifier, using the field name where possible and falling back to numeric field ID where the name is duplicated, for instance in mosaic pointings.

pipeline.infrastructure.utils.utils.get_num_caltable_polarizations(caltable: str)int[source]

Obtain number of polarisations from calibration table.

Seemingly the number of QA ID does not map directly to the number of polarisations for the spw in the MS, but the number of polarisations for the spw as held in the caltable.

pipeline.infrastructure.utils.utils.get_receiver_type_for_spws(ms, spwids: Sequence) → Dict[source]

Return dictionary of receiver types for requested spectral window IDs.

If spwid is not found in MeasurementSet instance, then detector type is set to “N/A”.

Parameters
  • ms – MeasurementSet to query for receiver types.

  • spwids – list of spw ids (integers) to query for.

Returns

A dictionary assigning receiver types as values to spwid keys.

pipeline.infrastructure.utils.utils_test module

pipeline.infrastructure.utils.utils_test.test_approx_equal(x: float, y: float, tol: float, expected: bool)[source]

Test approx_equal()

This utility function returns True if two numbers are equal within the given tolerance.

pipeline.infrastructure.utils.utils_test.test_are_equal(a: Union[List, numpy.ndarray], b: Union[List, numpy.ndarray], expected: bool)[source]

Test are_equal()

This utility function check the equivalence of array like objects. Two arrays are equal if they have the same number of elements and elements of the same index are equal.

pipeline.infrastructure.utils.utils_test.test_dict_merge(a: Dict, b: Dict, expected: Dict)[source]

Test dict_merge()

This utility function recursively merges dictionaries. If second argument (b) is a dictionary, then a copy of first argument (dictionary a) is created and the elements of b are merged into the new dictionary. Otherwise return argument b.

In case of matching non-dictionary value keywords, content of dictionary b overwrites that of dictionary a. If the matching keyword value is a dictionary then continue merging recursively.

pipeline.infrastructure.utils.utils_test.test_fieldname_clean(field: str, expected: str)[source]

Test fieldname_clean()

This utility function replaces special characters in string with underscore.

pipeline.infrastructure.utils.utils_test.test_fieldname_for_casa(field: str, expected: str)[source]

Test fieldname_for_casa()

This utility function ensures that field string can be used as CASA argument.

If field contains special characters, then return field string enclose in quotation marks, otherwise return unchanged string.

pipeline.infrastructure.utils.utils_test.test_find_ranges(data: Union[str, list], expected: str)[source]

Test find_ranges()

This utility function takes a string or a list of integers (e.g. spectral window lists) and returns a string containing identified ranges. E.g. [1,2,3] -> ‘1~3’

pipeline.infrastructure.utils.utils_test.test_flagged_intervals(vec: Union[List[int], numpy.ndarray], expected: List[Tuple[int]])[source]

Test flagged_intervals()

This utility function finds islands of ones in vector provided in argument. Used to find contiguous flagged channels in a given spw. Returns a list of tuples with the start and end channels.

pipeline.infrastructure.utils.utils_test.test_get_casa_quantity(value: Optional[Union[str, float, Dict]], expected: Dict)[source]

Test get_casa_quantity()

This utility function handles None values when calling CASA quanta.quantity() tool method.

pipeline.infrastructure.utils.utils_test.test_get_field_accessor(ms, field, expected)[source]

Test get_field_accessor()

This utility function returns an attribute getter. If the field specified in the argument is unique in the MeasurementSet, then the getter will access the field name (name attribute), otherwise the getter will access the field id (id attribute).

pipeline.infrastructure.utils.utils_test.test_get_field_identifiers(ms, expected)[source]

Test get_field_identifiers()

This utility function returns a dictionary with field ID keys and either field name or str(field ID) values. The latter happens when a field name occurs more than once.

pipeline.infrastructure.utils.utils_test.test_get_num_caltable_polarizations(caltable: str, expected: int)[source]

Test get_num_caltable_polarizations()

pipeline.infrastructure.utils.utils_test.test_get_receiver_type_for_spws(ms, spwids, expected)[source]

Test get_receiver_type_for_spws()

This utility function returns a dictionary with spectral window IDs (spwids arguemnt) as keys and the associated receiver strings in the MeasurementSet as values. If spectral window ID is not found in the MeasurementSet, then the associated values is set to ‘N/A’.

pipeline.infrastructure.utils.weblog module

The sorting module contains utility functions used by the pipeline web log.

class pipeline.infrastructure.utils.weblog.OrderedDefaultdict(*args, **kwargs)[source]

Bases: collections.OrderedDict

pipeline.infrastructure.utils.weblog.get_intervals(context, calapp, spw_ids=None)[source]

Get the integration intervals for scans processed by a calibration.

The scan and spw selection is formed through inspection of the CalApplication representing the calibration.

Parameters
  • context – the pipeline context

  • calapp – the CalApplication representing the calibration

  • spw_ids – a set of spw IDs to get intervals for. Leave as None to use all spws specified in the CalApplication.

Returns

a list of datetime objects representing the unique scan intervals

pipeline.infrastructure.utils.weblog.get_logrecords(result, loglevel)[source]

Get the logrecords for the result, removing any duplicates

Parameters
  • result – a result containing logrecords

  • loglevel – the loglevel to match

Returns

pipeline.infrastructure.utils.weblog.get_vis_from_plots(plots)[source]

Get the name to be used for the MS from the given plots.

Parameters

plots

Returns

pipeline.infrastructure.utils.weblog.merge_td_columns(rows, num_to_merge=None, vertical_align=False)[source]

Merge HTML TD columns with identical values using rowspan.

Arguments: rows – a list of tuples, one tuple per row, containing n elements for the

n columns.

num_to_merge – the number of columns to merge, starting from the left

hand column. Leave as None to merge all columns.

vertical_align – Set to True to vertically centre any merged cells.

Output: A list of strings, one string per row, containing TD elements.

pipeline.infrastructure.utils.weblog.total_time_on_source(scans)[source]

Return the total time on source for the given Scans.

Parameters

scans – collection of Scan domain objects

Returns

a datetime.timedelta object set to the total time on source

pipeline.infrastructure.utils.weblog.total_time_on_target_on_source(ms, autocorr_only=False)[source]

Return the nominal total time on target source for the given MeasurementSet excluding OFF-source integrations (REFERENCE). The flag is not taken into account.

Background of development: ALMA-TP observations have integrations of both TARGET and REFERENCE intents in one scan. Scan.time_on_source does not return appropriate exposure time in the case.

Parameters
  • ms – MeasurementSet domain object to examine

  • autocorr_only

Returns

a datetime.timedelta object set to the total time on source

Module contents

The utils package contains a set of utility classes and functions that are useful to the pipeline framework and to tasks manipulating pipeline framework objects, Python data types, and CASA data types.

The utils package is intended to be free of any task-specific logic. Code that assumes knowledge or logic beyond that of the task-independent framework should be housed in the h.common package (or hif.common, hifv.common, hsd.common, etc. as appropriate).