pipeline.infrastructure package

Subpackages

Submodules

pipeline.infrastructure.adapters module

class pipeline.infrastructure.adapters.Adapter(heuristic)[source]

Bases: pipeline.infrastructure.api.Heuristic

Adapter is a base class for heuristic adapters.

pipeline.infrastructure.api module

The API module defines the common classes around which the pipeline is structured.

A Task represents a coherent step in the data reduction process. A Task may be composed of several other Tasks in order to create a ‘super task’; this ‘super task’ could chains together several related steps in order to select the best method to use for a particular step in the reduction process, for example.

In addition, a Task may be composed of one or more Heuristics. Within the pipeline, a heuristic is considered as a piece of code used to analyse data and/or help influence a pipeline decision, such as whether to stop processing, whether to run again with different parameters, etc. Where appropriate, this code is extracted from the main body of the pipeline code and exposed as a Heuristic.

Something about Task parameters goes here

If a user desires more than to simply override a Heuristic-derived value with a Task parameter, it is anticipated that they will edit the appropriate Heuristic in order to tweak the existing implementation or introduce a new algorithm into the pipeline. Concrete Heuristic implementations can be found in the pipeline.heuristics package.

Inputs, in a general sense, are considered as the mandatory arguments for a pipeline Task. Tasks and Inputs are closely aligned, and just as Tasks differ in functionality, so the number and type of Inputs arguments will differ from implementation to implementation. Refer to the documentation of the implementing subclass for definitive information on the Inputs for a specific Task.

It is anticipated that the Inputs for a Task will be created using the Task.Inputs reference rather than locating and instantiating the partner class directly, eg.:

i = ImplementingTask.Inputs.create_from_context(context)
class pipeline.infrastructure.api.Heuristic[source]

Bases: object

Heuristic is the superclass of all user-accessible heuristics code in the pipeline.

A heuristic is a small, self-contained piece of code that calculate the optimal value(s) for a particular task or argument. Tasks may be composed of multiple heuristics; while the Task and Heuristic may be closely coupled, the Heuristic itself should not depend on the calling task, allowing it to be used in other tasks.

Examples of heuristics are functions to score caltables, allowing them to be ranked, or a function to calculate the optimal solution interval for a particular measurement set.

abstract calculate(*args, **parameters)[source]

Make a calculation based on the given parameters.

This is an abstract method and must be implemented by all Heuristic subclasses.

Note

The signature and return types of calculate() are intended to be implementation specific. Refer to the documentation of the implementing class for the appropriate signature.

class pipeline.infrastructure.api.ImagingMeasurementSetsPreferred[source]

Bases: object

Class used to register Inputs classes that prefer to see post-mstransform data when available.

class pipeline.infrastructure.api.Inputs[source]

Bases: object

Inputs defines the interface used to create a constructor argument for a Task.

to_casa_args()[source]

Get this Inputs object as a dictionary of CASA arguments.

Return type

a dictionary of CASA arguments

class pipeline.infrastructure.api.ResultRenderer[source]

Bases: object

ResultRenderer is the interface for task-specific weblog renderers (T2-4M details pages in weblog nomenclature).

Every ResultRenderer implementation must define two things: the task whose results the implementation renderers and the ‘render’ implementation itself.

The main weblog renderer queries the abstract base class registrations to find all ResultRenderers. Multiple results renderers may be registered for a single task; the main renderer will select the appropriate renderer b ased on sort order.

abstract render(context)[source]
abstract property task

The result class this renderer should handle.

class pipeline.infrastructure.api.Results[source]

Bases: object

Results defines the interface used to hold the output of a Task plus some common parameters used by all weblog templates. This class is expected to be the base class of a more specialised task-specific class. Refer to the sub-class for details on the structure expected by the task-specific weblog template.

task_class

the Class of the Task that generated this Results object.

inputs

the Inputs used by the Task that generated this Results

timestamps

the (Timestamps) named tuple holding the start and end timestamps for the Task

stage_number

the stage number of the task that generated this Results

metadata

the dict holding metadata describing this result and the generating Task

abstract accept(context)[source]

Accept these results, registering objects with the context and incrementing stage counters as necessary in preparation for the next task.

abstract property metadata

Object holding metadata describing this result and the generating task. This information is primarily for presentation and the web log, but it could be used for other purposes in future.

Note: currently implemented as a Dict, but this could evolve to a more structured or hierarchical object depending on how much information we need to store.

abstract property uuid

The unique identifier for this results object.

class pipeline.infrastructure.api.Task[source]

Bases: object

The Runnable interface should be implemented by any class whose instances are intended to be executed by an Executor.

Duck typing means that implementing classes need not extend this ‘interface’, but doing so registers that class as an implementation with the Python abstract base class mechanism. Future versions of the pipeline may query these registrations.

Implementing classes: pipeline.tasks.TaskTemplate pipeline.infrastructure.JobRequest

abstract property Inputs

A reference to the accompanying Inputs partner class that comprises the mandatory arguments for this Task.

abstract execute(dry_run=False, **parameters)[source]

Run this task and return the Results.

The contract of the method execute is that it may take any action whatsoever, providing that it returns an instance implementing Results that summarises those actions.

Parameters

dry_run (boolean) – when set to True, runs the Task and logs any operations that would be performed without executing them.

pipeline.infrastructure.argmapper module

Code in the argmapper module transforms CASA input arguments to pipeline inputs arguments.

CASA arguments may be named differently to their corresponding arguments in the pipeline classes. This module defines how those arguments should be renamed so that they are acceptable to the Inputs constructors, and secondly it converts from the CASA concept of null values (‘’, [], etc.) to the pipeline equivalent.

pipeline.infrastructure.argmapper.convert_args(pipeline_cls, casa_args, convert_nulls=True)[source]

Convert CASA arguments to pipeline Inputs arguments.

This function converts a dictionary of CASA arguments to the corresponding dictionary of pipeline Inputs arguments by doing the following:

  1. Rename CASA arguments to their pipeline equivalents.

  2. Remove any arguments not accepted by the Inputs constructor.

  3. Convert any CASA null values to pipeline null values.

Parameters
  • pipeline_cls (class) – the pipeline Task class

  • casa_args (dict) – the dictionary of CASA arguments and values

Return type

dict

pipeline.infrastructure.argmapper.inputs_to_casa(pipeline_cls, args)[source]
pipeline.infrastructure.argmapper.task_to_casa(taskname, task_args)[source]

pipeline.infrastructure.basetask module

class pipeline.infrastructure.basetask.Executor(context, dry_run=True)[source]

Bases: object

execute(*args, **kw)[source]
class pipeline.infrastructure.basetask.FailedTask(context)[source]

Bases: pipeline.infrastructure.basetask.StandardTaskTemplate

class pipeline.infrastructure.basetask.FailedTaskResults(origtask_cls, exception, tb)[source]

Bases: pipeline.infrastructure.basetask.Results

FailedTaskResults represents a results object for a task that encountered an exception during execution.

class pipeline.infrastructure.basetask.ModeTask(inputs)[source]

Bases: pipeline.infrastructure.api.Task

execute(dry_run=True, **parameters)[source]

Run this task and return the Results.

The contract of the method execute is that it may take any action whatsoever, providing that it returns an instance implementing Results that summarises those actions.

Parameters

dry_run (boolean) – when set to True, runs the Task and logs any operations that would be performed without executing them.

is_multi_vis_task = False
class pipeline.infrastructure.basetask.Results[source]

Bases: pipeline.infrastructure.api.Results

Results is the base implementation of the Results API.

In practice, all results objects should subclass this object to take advantage of the shared functionality.

accept(context=None)[source]

Accept these results, registering objects with the context and incrementing stage counters as necessary in preparation for the next task.

merge_with_context(context)[source]

Merge these results with the given context.

This method will be called during the execution of accept(). For calibration tasks, a typical implementation will register caltables with the pipeline callibrary.

At this point the result is deemed safe to merge, so no further checks on the context need be performed.

Parameters

context (Context) – the target Context

property metadata

Object holding presentation-related values destined for the web log

property uuid

The unique identifier for this results object.

class pipeline.infrastructure.basetask.ResultsList(results=None)[source]

Bases: pipeline.infrastructure.basetask.Results

accept(context=None)[source]

Accept these results, registering objects with the context and incrementing stage counters as necessary in preparation for the next task.

append(other)[source]
extend(other)[source]
merge_with_context(context)[source]

Merge these results with the given context.

This method will be called during the execution of accept(). For calibration tasks, a typical implementation will register caltables with the pipeline callibrary.

At this point the result is deemed safe to merge, so no further checks on the context need be performed.

Parameters

context (Context) – the target Context

class pipeline.infrastructure.basetask.ResultsProxy(context)[source]

Bases: object

read()[source]

Read the pickle from disk, returning the unpickled object.

write(result)[source]

Write the pickled result to disk.

class pipeline.infrastructure.basetask.StandardTaskTemplate(inputs)[source]

Bases: pipeline.infrastructure.api.Task

StandardTaskTemplate is a template class for pipeline reduction tasks whose execution can be described by a common four-step process:

  1. prepare(): examine the measurement set and prepare a list of intermediate job requests to be executed.

  2. execute the jobs

  3. analyze(): analyze the output of the intermediate job requests, deciding if necessary which parameters provide the best results, and return these results.

  4. return a final list of jobs to be executed using these best-fit parameters.

Simpletask implements the Task interface and steps 2 and 4 in the above process, leaving subclasses to implement prepare() and analyse().

A Task and its Inputs are closely aligned. It is anticipated that the Inputs for a Task will be created using the Task.Inputs reference rather than locating and instantiating the partner class directly, eg.:

i = ImplementingTask.Inputs.create_from_context(context)
class HeadTail(head, tail)

Bases: tuple

property head

Alias for field number 0

property tail

Alias for field number 1

abstract analyse(result)[source]

Determine the best parameters by analysing the given jobs before returning any final jobs to execute.

Parameters

jobs (a list ofJobRequest) – the job requests generated by prepare()

Return type

Result

execute(*args, **kw)[source]

Run this task and return the Results.

The contract of the method execute is that it may take any action whatsoever, providing that it returns an instance implementing Results that summarises those actions.

Parameters

dry_run (boolean) – when set to True, runs the Task and logs any operations that would be performed without executing them.

is_multi_vis_task = False
abstract prepare(**parameters)[source]

Prepare job requests for execution.

Parameters

parameters – the parameters to pass through to the subclass. Refer to the implementing subclass for specific information on what these parameters are.

Return type

a class implementing Result

class pipeline.infrastructure.basetask.Timestamps(start, end)

Bases: tuple

property end

Alias for field number 1

property start

Alias for field number 0

pipeline.infrastructure.basetask.capture_log(method)[source]
pipeline.infrastructure.basetask.property_with_default(name, default, doc=None)[source]

Return a property whose value is reset to a default value when setting the property value to None.

pipeline.infrastructure.basetask.result_finaliser(method)[source]

Copy some useful properties to the Results object before returning it. This is used in conjunction with execute(), where the Result could be returned from a number of places but we don’t want to set the properties in each location.

TODO: refactor so this is done as part of execute!

pipeline.infrastructure.basetask.timestamp(method)[source]
pipeline.infrastructure.basetask.write_pipeline_casa_tasks(context)[source]

Write the equivalent pipeline CASA tasks for results in the context to a file

pipeline.infrastructure.callibrary module

class pipeline.infrastructure.callibrary.CalAppOrigin(task, inputs)

Bases: tuple

property inputs

Alias for field number 1

property task

Alias for field number 0

class pipeline.infrastructure.callibrary.CalApplication(calto, calfrom, origin=None)[source]

Bases: object

CalApplication maps calibration tables and their application arguments to a target data selection, encapsulated as CalFrom and CalTo objects respectively.

calto

the CalTo representing the data selection to which the calibration should apply.

calfrom

the CalFrom representing the calibration and application parameters

origin

the CalAppOrigin marking how this calibration was created

property antenna

The antennas to which the calibrations apply.

Return type

string

as_applycal()[source]

Get a representation of this object as a CASA applycal call.

Return type

string

property calwt

The calwt parameters to be used when applying these calibrations.

Return type

a scalar string if representing 1 calibration, otherwise a list of strings

exists()[source]

Test whether all calibration tables referred to by this application exist.

Return type

boolean

property field

The fields to which the calibrations apply.

Return type

string

static from_export(s)[source]

Unmarshal a CalApplication from a string.

Return type

the unmarshalled CalApplication object

property gainfield

The gainfield parameters to be used when applying these calibrations.

Return type

a scalar string if representing 1 calibration, otherwise a list of strings

property gaintable

The gaintable parameters to be used when applying these calibrations.

Return type

a scalar string if representing 1 calibration, otherwise a list of strings

property intent

The observing intents to which the calibrations apply.

Return type

string

property interp

The interp parameters to be used when applying these calibrations.

Return type

a scalar string if representing 1 calibration, otherwise a list of strings

property spw

The spectral windows to which the calibrations apply.

Return type

string

property spwmap

The spwmap parameters to be used when applying these calibrations.

Return type

a scalar string if representing 1 calibration, otherwise a list of strings

property vis

The name of the measurement set to which the calibrations apply.

Return type

string

class pipeline.infrastructure.callibrary.CalFrom(gaintable=None, gainfield='', interp='linear,linear', spwmap=None, caltype='unknown', calwt=True)[source]

Bases: object

CalFrom represents a calibration table and the CASA arguments that should be used when applying that calibration table.

CALTYPES

an enumeration of calibration table types identified by this code.

CALTYPE_TO_VISCAL

mapping of calibration type to caltable identifier as store in the table header

VISCAL

mapping of calibration table header information to a description of that table type

CALTYPES = {'amp': 13, 'antpos': 6, 'bandpass': 2, 'finalcal': 11, 'gaincal': 1, 'gc': 7, 'kcross': 17, 'opac': 8, 'otf': 18, 'otfraster': 15, 'polarization': 5, 'ps': 14, 'rq': 9, 'swpow': 10, 'tecim': 16, 'tsys': 3, 'unknown': 0, 'uvcont': 12, 'wvr': 4}
CALTYPE_TO_VISCAL = {'antpos': ('KANTPOS JONES',), 'bandpass': ('B JONES', 'BPOLY'), 'gaincal': ('G JONES', 'GSPLINE', 'T JONES'), 'otf': ('SDSKY_OTF',), 'otfraster': ('SDSKY_RASTER',), 'ps': ('SDSKY_PS',), 'tsys': ('B TSYS',), 'uvcont': ('A MUELLER',)}
VISCAL = {'A MUELLER': 'A Mueller (baseline-based)', 'B JONES': 'B Jones (bandpass)', 'B TSYS': 'B TSYS (freq-dep Tsys)', 'BPOLY': 'B Jones Poly (bandpass)', 'D JONES': 'D Jones (instrumental polarization)', 'DF JONES': 'Df Jones (frequency-dependent instrumental polarization)', 'DFGEN JONES': 'Dfgen Jones (frequency-dependent instrumental polarization)', 'DGEN JONES': 'Dgen Jones (instrumental polarization)', 'G JONES': 'G Jones (electronic Gain)', 'GLINXPH JONES': 'GlinXph Jones (X-Y phase)', 'GSPLINE': 'G Jones SPLINE (elec. gain)', 'J JONES': 'J Jones (generic polarized gain)', 'KANTPOS JONES': 'KAntPos Jones (antenna position errors)', 'M MUELLER': 'M Mueller (baseline-based)', 'MF MUELLER': 'Mf Mueller (closure bandpass)', 'P JONES': 'P Jones (parallactic angle phase)', 'T JONES': 'T Jones (polarization-independent troposphere)', 'TF JONES': 'Tf Jones (frequency-dependent atmospheric complex gain)', 'TFOPAC': 'TfOpac (frequency-dependent opacity)', 'TOPAC': 'TOpac (Opacity corrections in amplitude)', 'X JONES': 'X Jones (antenna-based)', 'X MUELLER': 'X Mueller (baseline-based)', 'XF JONES': 'Xf Jones (antenna-based)'}
property caltype
property calwt
property gainfield
property gaintable
static get_caltype_for_viscal(viscal)[source]
property interp
property spwmap
pipeline.infrastructure.callibrary.CalLibrary

alias of pipeline.infrastructure.callibrary.IntervalCalLibrary

pipeline.infrastructure.callibrary.CalState

alias of pipeline.infrastructure.callibrary.IntervalCalState

class pipeline.infrastructure.callibrary.CalTo(vis=None, field='', spw='', antenna='', intent='')[source]

Bases: object

CalTo represents a target data selection to which a calibration can be applied.

property antenna
property field
static from_caltoargs(cta: pipeline.infrastructure.callibrary.CalToArgs)pipeline.infrastructure.callibrary.CalTo[source]
property intent
property spw
property vis
class pipeline.infrastructure.callibrary.CalToArgs(vis, spw, field, intent, antenna)

Bases: tuple

property antenna

Alias for field number 4

property field

Alias for field number 2

property intent

Alias for field number 3

property spw

Alias for field number 1

property vis

Alias for field number 0

class pipeline.infrastructure.callibrary.CalToIdAdapter(context, calto)[source]

Bases: object

property antenna
property field
get_field_intents(field_id, spw_id)[source]
property intent
property ms
property spw
class pipeline.infrastructure.callibrary.CalToIntervalAdapter(context, calto)[source]

Bases: object

class pipeline.infrastructure.callibrary.DictCalLibrary(context)[source]

Bases: object

CalLibrary is the root object for the pipeline calibration state.

property active

CalState holding CalApplications to be (pre-)applied to the MS.

add(calto, calfroms)[source]
property applied

CalState holding CalApplications that have been applied to the MS via the pipeline applycal task.

clear()[source]
export(filename=None)[source]

Export the pre-apply calibration state to disk.

The pre-apply calibrations held in the ‘active’ CalState will be written to disk as a set of equivalent applycal calls.

export_applied(filename=None)[source]

Export the applied calibration state to disk.

The calibrations held in the ‘applied’ CalState will be written to disk as a set of equivalent applycal calls.

get_calstate(calto, hide_null=True, ignore=None)[source]

Get the calibration state for a target data selection.

import_state(filename=None, append=False)[source]
mark_as_applied(calto, calfrom)[source]
class pipeline.infrastructure.callibrary.DictCalState(default_factory=<function _ms_dim>)[source]

Bases: collections.defaultdict

DictCalState is a data structure used to map calibrations for all data registered with the pipeline.

It is implemented as a multi-dimensional array indexed by data selection parameters (ms, spw, field, intent, antenna), with the end value being a list of CalFroms, representing the calibrations to be applied to that data selection.

as_applycal()[source]
static dictify(dd)[source]

Get a standard dictionary of the items in the tree.

get_caltable(caltypes=None)[source]

Get the names of all caltables registered with this CalState.

If an optional caltypes argument is given, only caltables of the requested type will be returned.

Parameters

caltypes – Caltypes should be one or/a list of table

types known in CalFrom.CALTYPES.

Return type

set of strings

global_reactivate(calfroms)[source]

Reactivate a CalFrom that was marked as ignored through a call to global_remove.

This will reactivate the CalFrom entry, making it appear at whatever index in the CalApplications that it was originally registered, e.g. if a CalFrom was ‘deleted’ via a call to global_remove and 3 more CalFroms were added to the CalState, when the CalFrom is reactivated it will appear in the original position - that is, before the 3 subsequent CalFroms, rather than appearing at the end of the list.

Parameters

calfroms (a set of CalFrom objects) – the CalFroms to reactivate

Returns

None

global_remove(calfrom)[source]

Mark a CalFrom as being removed from the calibration state. Rather than iterating through the registered calibrations, this adds the CalFrom to a set of object to be ignored. When the calibrations are subsequently inspected, CalFroms marked as removed will be bypassed.

Parameters

calfrom – the CalFrom to remove

Returns

merged(hide_empty=False)[source]
class pipeline.infrastructure.callibrary.IntervalCalLibrary(context)[source]

Bases: object

CalLibrary is the root object for the pipeline calibration state.

property active

CalState holding CalApplications to be (pre-)applied to the MS.

add(calto, calfroms)[source]
property applied

CalState holding CalApplications that have been applied to the MS via the pipeline applycal task.

clear()[source]
export(filename=None)[source]

Export the pre-apply calibration state to disk.

The pre-apply calibrations held in the ‘active’ CalState will be written to disk as a set of equivalent applycal calls.

export_applied(filename=None)[source]

Export the applied calibration state to disk.

The calibrations held in the ‘applied’ CalState will be written to disk as a set of equivalent applycal calls.

get_calstate(calto, ignore=None)[source]

Get the active calibration state for a target data selection.

Parameters
  • calto – the data selection

  • ignore

Returns

import_state(filename=None, append=False)[source]
mark_as_applied(calto, calfrom)[source]
unregister_calibrations(predicate_fn: Callable[[pipeline.infrastructure.callibrary.CalToArgs, pipeline.infrastructure.callibrary.CalFrom], bool])[source]

Delete active calibrations that match the input predicate function.

Previously, calibration had to be removed by calling private callibrary functions, e.g.,

calto = callibrary.CalTo(self.inputs.vis) calfrom = callibrary.CalFrom(gaintable=ktypecaltable, interp=’’, calwt=False) context.callibrary._remove(calto, calfrom, context.callibrary._active)

This function makes calibration removal a first-class public function of the callibrary, and requires less knowledge of the calibration to remove.

The predicate function passed in by the caller defines which calibrations should be unregistered. For example, Tsys caltable removal can be achieved with the code below.

def match_tsys(calto, calfrom):

return calfrom.type == ‘tsys’

callibrary.unregister_calibrations(match_tsys)

The pipeline inserts the task name into the caltable filename, which can be used to unregister caltables generated by that task. For example,

def match_task_caltable(calto, calfrom):

return ‘hifa_bandpass’ in calfrom.gaintable

context.callibrary.unregister_calibrations(match_task_caltable)

If you wanted to match calwt, interp, vis, etc. then that could be done in the matcher function too, but if it’s not necessary to identify the caltable then it doesn’t need to be tested in the predicate function.

class pipeline.infrastructure.callibrary.IntervalCalState[source]

Bases: object

CalState is a data structure used to map calibrations for all data registered with the pipeline.

It is implemented as a multi-dimensional array indexed by data selection parameters (ms, spw, field, intent, antenna), with the end value being a list of CalFroms, representing the calibrations to be applied to that data selection.

as_applycal()[source]
clear()[source]
static create_from_context(context)[source]
export_to_casa_callibrary(ms, callibfile)[source]
static from_calapplication(context, calto, calfroms)[source]
get_caltable(caltypes=None) → Set[str][source]

Get the names of all caltables registered with this CalState.

If an optional caltypes argument is given, only caltables of the requested type will be returned.

Parameters

caltypes – Caltypes should be one or/a list of table

types known in CalFrom.CALTYPES.

Return type

set of strings

merged(hide_empty=False)[source]
trimmed(context, calto)[source]

Return a copy of this IntervalCalState trimmed to the specified CalTo data selection. :param calto: :param selection_intervals: :return:

class pipeline.infrastructure.callibrary.TimestampedData(time, data, marker=None)[source]

Bases: pipeline.infrastructure.callibrary.TimestampedDataBase

cmp(other)[source]

Tells whether other sorts before, after or equal to this Interval.

Sorting is by time then by data fields.

If data fields are not both sortable types, data fields are compared alphabetically by type name. :param other: Interval :return: -1, 0, 1 :rtype: int

pipeline.infrastructure.callibrary.ant_add(td1, td2, join=<function merge_intervaltrees.<locals>.m>)
pipeline.infrastructure.callibrary.ant_sub(td1, td2, join=<function merge_intervaltrees.<locals>.m>)
pipeline.infrastructure.callibrary.consolidate_calibrations(all_my_calapps)[source]

Consolidate a list of (CalTo, [CalFrom..]) 2-tuples into a smaller set of equivalent applications by consolidating their data selection arguments.

This function works by merging the data selections of CalTo objects that have the same calibration application, as determined by the values and data selection present in the CalFroms.

Parameters

calapps – an iterable of (CalTo, [CalFrom..]) 2-tuples

Returns

a list of (CalTo, [CalFrom..]) tuples

pipeline.infrastructure.callibrary.contiguous_sequences(l)[source]

Group a sequence of numbers into contiguous groups

Parameters

l – a sequence

Returns

list of Intervals

pipeline.infrastructure.callibrary.copy_calapplication(calapp, origin=None, **overrides)[source]

Copy a CalApplication, overwriting any CalTo or CalFrom values with the given override values.

For instance, to create a copy of a CalApplication with the CalFrom.calwt set to True and the CalTo.spw set to 9:

modified = copy_calapplication(calapp, calwt=True, spw=9)

Parameters
  • calapp – CalApplication to copy

  • origin – origin to set, or None to copy the origin from calapp

  • overrides – kw/val pairs of calto/calfrom attributes to override

Returns

CalApplication instance

pipeline.infrastructure.callibrary.create_data_reducer(join)[source]

Return a function that creates a new TimestampedData object containing the result of executing the given operation on two TimestampedData objects.

The use case for this function is actually quite simple: perform an operation on two TimestampedData objects (add, subtract, etc.) and put the result in a new TimestampedData object.

The resulting TimestampedData object has a creation time equal to that of the oldest input object.

Parameters

join – the function to call on the two input objects

Returns

pipeline.infrastructure.callibrary.create_interval_tree(a)[source]

Create an IntervalTree containing a set of Intervals.

The input argument used to create the Intervals is an iterable of 3-tuples, each 3-tuple defined as:

(interval start, interval end, function giving value for that interval).

Parameters

a – the iterable of argument tuples

Returns

IntervalTree

pipeline.infrastructure.callibrary.create_interval_tree_for_ms(ms)[source]

Create a new IntervalTree fitted to the dimensions of a measurement set.

This function creates a new IntervalTree with the size of the antenna, spw, field and intent dimensions fitted to envelop of the input measurement set.

Parameters

ms

Returns

an IntervalTree

pipeline.infrastructure.callibrary.create_interval_tree_nd(intervals, value_fn)[source]

Create a multidimensional IntervalTree. Each Interval within the IntervalTree points to the next dimension, with the final Interval containing the value given by calling value_fn.

Parameters

intervals – a list of Interval lists, with range of the final

(deepest) first, ending with the range of the root dimension :param value_fn: function that returns value for the final dimension :return: an IntervalTree

pipeline.infrastructure.callibrary.data_selection_contains(proposed, calto_args)[source]

Return True if one data selection is contained within another.

Parameters
Returns

True if data selection 2 is contained within data selection 1

pipeline.infrastructure.callibrary.defrag_interval_tree(tree)[source]

Condense an IntervalTree by consolidating fragmented entries with the same value into contiguous Intervals.

Parameters

tree

Returns

pipeline.infrastructure.callibrary.expand_calstate(calstate)[source]

Convert an IntervalCalState into the equivalent consolidated list of (CalTo, [CalFrom..]) 2-tuples.

This function is is the top-level entry point for converting a calibration state to 2-tuples. It consolidates data selections and converts numeric data selection IDs to friendly equivalents through downstream processing,

Parameters

calstate – the IntervalCalState to convert

Returns

a list of (CalTo, [CalFrom..]) tuples

pipeline.infrastructure.callibrary.expand_calstate_to_calapps(calstate: pipeline.infrastructure.callibrary.IntervalCalState) → List[Tuple[pipeline.infrastructure.callibrary.CalTo, List[pipeline.infrastructure.callibrary.CalFrom]]][source]

Convert an IntervalCalState into a list of (CalTo, [CalFrom..]) tuples.

Parameters

calstate – the IntervalCalState to convert

Returns

a list of 2-tuples, first element a Calto, second element a list

of CalFroms

pipeline.infrastructure.callibrary.expand_interval(interval, calto_args, calto_fn)[source]

Convert an Interval into the equivalent list of (CalTo, [CalFrom..]) 2-tuples.

This function is the partner function to expand_intervaltree. See the documention for expand_intervaltree for more details on the argument format for this function.

Parameters
  • interval – the Interval to convert

  • calto_args – the list of (argument name, conversion function) 2-tuples for the remaining dimensions

  • calto_fn – the partial CalToArgs application

Returns

a list of (CalTo, [CalFrom..]) tuples

pipeline.infrastructure.callibrary.expand_intervaltree(tree, convert_fns, calto_fn)[source]

Convert an IntervalTree into the equivalent list of (CalTo, [CalFrom..]) 2-tuples.

The second argument for this function is a list of 2-tuples of (CalToArgs constructor argument for this dimension, value conversion function for this dimension). The conversion function takes in a set of integer indexes and converts it to a suitable (probably more human-readable value) for that dimension, e.g. a conversion from field ID to field name. So, for a dimension that supplies the ‘antenna’ argument to CalToArgs and should prefix ‘DV’ to each antenna index, the tuple for that dimension could be (‘antenna’, lambda id: {‘DV%s’ % i for i in field_ids}).

The third argument is the partially-applied CalToArgs constructor. A CalToArgs needs a number of arguments (vis, field, spw, etc.), each of which corresponds to a dimension of the IntervalTree and which must be supplied at CalToArgs creation time. To achieve this while iterating through the dimensions (when the constructor arguments are not fully known), object creation is delayed by performing just a partial application, adding the keyword for the current dimension to the partial application. At the final leaf node, when all constructor arguments have been partially applied, we can call the partial function and get the CalToArgs.

Parameters
  • tree – the IntervalTree to convert

  • convert_fns – the list of (argument name, conversion function) 2-tuples for the remaining dimensions

  • calto_fn – the partial CalToArgs application

Returns

a list of (CalTo, [CalFrom..]) tuples

pipeline.infrastructure.callibrary.field_add(td1, td2, join=<function merge_intervaltrees.<locals>.m>)
pipeline.infrastructure.callibrary.field_sub(td1, td2, join=<function merge_intervaltrees.<locals>.m>)
pipeline.infrastructure.callibrary.fix_cycle0_data_selection(context, calstate)[source]
pipeline.infrastructure.callibrary.get_calstate_shape(ms)[source]

Get an IntervalTree shaped to the dimensions of the given measurement set.

This function calculates the size of each metadata dimension (spw; intent; field; antenna), creating and returning an IntervalTree shaped to match. The output of this function is used to trim a calibration applied globally in one or more dimensions to a valid data selection.

Output from this function is cached as it can take several seconds to calculate the result, which is done repeatedly when importing a calstate containing many entries.

Note: this assumes that shape of an MS never changes, which should be true; the number of spws, fields, ants, etc. never changes.

Parameters

ms – the MeasurementSet to analyse

Returns

IntervalTree shaped to match valid data dimensions

pipeline.infrastructure.callibrary.get_calto_from_inputs(inputs)[source]

Get a CalTo data selection object based on the state of an Inputs object

pipeline.infrastructure.callibrary.get_id_to_field_fn(ms_to_id_to_field)[source]

Return a function that can convert field IDs to a field name.

Takes a dict of dicts, first key mapping measurement set name and second key mapping numeric field ID to field name, eg.

{‘a.ms’: {0: ‘field 1’, 1: ‘field 2’}

Parameters

ms_to_id_to_field – dict of vis : field ID : field name

Returns

set of field names (or field IDs if names are not unique)

pipeline.infrastructure.callibrary.get_id_to_intent_fn(id_to_intent)[source]

Return a function that can convert intent IDs to a string intent.

Takes a dict of dicts, first key mapping measurement set name and second key mapping numeric intent ID to string intent for that MS, e.g.

{‘a.ms’: {0: ‘PHASE’, 1: ‘BANDPASS’}

Parameters

id_to_intent – dict of vis : intent ID : string intent

Returns

set of intents

pipeline.infrastructure.callibrary.get_intent_id_map(ms)[source]

Get the mapping of intent ID to string intent for a measurement set.

Parameters

ms – the measurement set to analyse

Returns

a dict of intent ID: intent

pipeline.infrastructure.callibrary.get_matching_calstate(context: pipeline.infrastructure.launcher.Context, calstate: pipeline.infrastructure.callibrary.IntervalCalState, predicate_fn: Callable[[pipeline.infrastructure.callibrary.CalToArgs, pipeline.infrastructure.callibrary.CalFrom], bool])pipeline.infrastructure.callibrary.IntervalCalState[source]

Return an IntervalCalState contain calibrations in the input IntervalCalState that match the predicate function.

The use case for this function is to identify calibrations matching a pattern so that those calibrations can be deleted or modified. For instance, matching registered bandpass caltables so they can be removed from the active CalState.

Parameters
  • context – pipeline context (required to create IntervalCalState)

  • calstate – calibration state to inspect

Predicate_fn

matching function that returns True when the selection is to be added to the output IntervalCalState

pipeline.infrastructure.callibrary.get_min_max(l, keyfunc=None)[source]
pipeline.infrastructure.callibrary.intent_add(td1, td2, join=<function merge_lists.<locals>.m>)
pipeline.infrastructure.callibrary.intent_sub(td1, td2, join=<function merge_lists.<locals>.m>)
pipeline.infrastructure.callibrary.interval_to_set(interval)[source]

Get the all the indexes covered by an Interval.

Parameters

interval

Returns

pipeline.infrastructure.callibrary.merge_contiguous_intervals(tree)[source]

Merge contiguous Intervals with the same value into one Interval.

Parameters

tree – an IntervalTree

Returns

new IntervalTree with merged Intervals

pipeline.infrastructure.callibrary.merge_intervaltrees(on_intersect)[source]

Return a function that merges two IntervalTrees, executing a function on the intersecting Interval ranges in the resulting merged IntervalTree.

Parameters

on_intersect – the function to call on overlapping Intervals

Returns

function

pipeline.infrastructure.callibrary.merge_lists(join_fn=<built-in function add>)[source]

Return a function that merge two lists by calling the input operation on the two input arguments.

Parameters

join_fn

Returns

pipeline.infrastructure.callibrary.ranges(lst)[source]
pipeline.infrastructure.callibrary.safe_join(vals, separator=',')[source]
pipeline.infrastructure.callibrary.sequence_to_casa_range(seq)[source]
pipeline.infrastructure.callibrary.sequence_to_range(l)
pipeline.infrastructure.callibrary.set_calstate_marker(calstate, marker)[source]

Return a copy of a calstate, modified so that TimeStampedData objects in the final leaf node are annotated with the given marker object.

Technical details:

CalFroms are flyweight objects, so two identical CalFroms have the same hash. Identical hashes stop the IntervalTree union function from working as expected: IntervalTrees are based on sets, and as such adding two lists of CalFrom with identical hashes results in just one CalFrom list in the final IntervalTree, when we actually wanted the duplicate to be added.

This function is used to ensure that CalState arithmetic works as expected. By changing the TimeStampedData marker and thus making the hashes different, ‘identical’ calibrations can indeed be duplicated in the IntervalTree union operation, and subsequently operated on in a merge_equals step.

Parameters
  • calstate – the calstate to modify

  • marker – the object to annotate calstates with

Returns

annotated calstate

pipeline.infrastructure.callibrary.spw_add(td1, td2, join=<function merge_intervaltrees.<locals>.m>)
pipeline.infrastructure.callibrary.spw_sub(td1, td2, join=<function merge_intervaltrees.<locals>.m>)
pipeline.infrastructure.callibrary.trim(tree, ranges)[source]

Return an IntervalTree trimmed to the specified ranges.

Ranges are specified as tuples of (begin, end).

Parameters
  • tree – the IntervalTree to trim

  • ranges – a list of range tuples

Returns

the trimmed IntervalTree

pipeline.infrastructure.callibrary.trim_nd(tree, selection)[source]

Return an IntervalTree with each dimension trimmed to the specified set of ranges.

The data selection for each dimension is specified as a sequence of (begin, end) tuples; the data selection for the tree as a whole is a sequence of these dimension sequences. For example, the data selection

[ [(1, 3)], [(0, 5), (7, 8)] ]

would select 1-3 from the first dimension and 0-5, and 7 from the second dimension.

Parameters
  • tree – the IntervalTree to trim

  • selection – the sequence of data selections for each dimension

Returns

pipeline.infrastructure.callibrary.trim_to_valid_data_selection(calstate, vis=None)[source]

Trim an IntervalCalState to the shape of valid (present) data selections.

This is achieved by trimming Intervals for each dimension (antenna, spw, field, intent) to exclude ranges for which no data is present.

See CAS-9415: CalLibrary needs a way to filter out calibration applications for missing data selections

Parameters
  • calstate – the calstate to shape

  • vis – name of the calstate to shape. If not defined, shape all.

Returns

a new, shaped IntervalCalState

pipeline.infrastructure.callibrary.unit(x)[source]

pipeline.infrastructure.casa_tasks module

This module contains a wrapper function for every CASA task. The signature of each methods exactly matches that of the CASA task it mirrors. However, rather than executing the task directly when these methods are called, CASATaskJobGenerator returns a JobRequest for every invocation; these jobs then be examined and executed at a later date.

The CASA task implementations are located at run-time and proxies for each task attached to this class at runtime. The name and signature of each method will match those of the tasks in the CASA environment when this module was imported.

pipeline.infrastructure.casa_tasks.applycal(*v, **k)[source]
pipeline.infrastructure.casa_tasks.bandpass(*v, **k)[source]
pipeline.infrastructure.casa_tasks.calstat(*v, **k)[source]
pipeline.infrastructure.casa_tasks.clearcal(*v, **k)[source]
pipeline.infrastructure.casa_tasks.copyfile(*v, **k)[source]
pipeline.infrastructure.casa_tasks.copytree(*v, **k)[source]
pipeline.infrastructure.casa_tasks.delmod(*v, **k)[source]
pipeline.infrastructure.casa_tasks.exportfits(*v, **k)[source]
pipeline.infrastructure.casa_tasks.flagcmd(*v, **k)[source]
pipeline.infrastructure.casa_tasks.flagdata(*v, **k)[source]
pipeline.infrastructure.casa_tasks.flagmanager(*v, **k)[source]
pipeline.infrastructure.casa_tasks.fluxscale(*v, **k)[source]
pipeline.infrastructure.casa_tasks.gaincal(*v, **k)[source]
pipeline.infrastructure.casa_tasks.gencal(*v, **k)[source]
pipeline.infrastructure.casa_tasks.hanningsmooth(*v, **k)[source]
pipeline.infrastructure.casa_tasks.imdev(*v, **k)[source]
pipeline.infrastructure.casa_tasks.imhead(*v, **k)[source]
pipeline.infrastructure.casa_tasks.immath(*v, **k)[source]
pipeline.infrastructure.casa_tasks.immoments(*v, **k)[source]
pipeline.infrastructure.casa_tasks.impbcor(*v, **k)[source]
pipeline.infrastructure.casa_tasks.importasdm(*v, **k)[source]
pipeline.infrastructure.casa_tasks.imregrid(*v, **k)[source]
pipeline.infrastructure.casa_tasks.imstat(*v, **k)[source]
pipeline.infrastructure.casa_tasks.imsubimage(*v, **k)[source]
pipeline.infrastructure.casa_tasks.imval(*v, **k)[source]
pipeline.infrastructure.casa_tasks.initweights(*v, **k)[source]
pipeline.infrastructure.casa_tasks.listobs(*v, **k)[source]
pipeline.infrastructure.casa_tasks.move(*v, **k)[source]
pipeline.infrastructure.casa_tasks.mstransform(*v, **k)[source]
pipeline.infrastructure.casa_tasks.partition(*v, **k)[source]
pipeline.infrastructure.casa_tasks.plotants(*v, **k)[source]
pipeline.infrastructure.casa_tasks.plotbandpass(*v, **k)[source]
pipeline.infrastructure.casa_tasks.plotms(*v, **k)[source]
pipeline.infrastructure.casa_tasks.plotweather(*v, **k)[source]
pipeline.infrastructure.casa_tasks.polcal(*v, **k)[source]
pipeline.infrastructure.casa_tasks.rmtree(*v, **k)[source]
pipeline.infrastructure.casa_tasks.sdbaseline(*v, **k)[source]
pipeline.infrastructure.casa_tasks.sdcal(*v, **k)[source]
pipeline.infrastructure.casa_tasks.sdimaging(*v, **k)[source]
pipeline.infrastructure.casa_tasks.setjy(*v, **k)[source]
pipeline.infrastructure.casa_tasks.split(*v, **k)[source]
pipeline.infrastructure.casa_tasks.statwt(*v, **k)[source]
pipeline.infrastructure.casa_tasks.tclean(*v, **k)[source]
pipeline.infrastructure.casa_tasks.tsdimaging(*v, **k)[source]
pipeline.infrastructure.casa_tasks.uvcontfit(*v, **k)[source]
pipeline.infrastructure.casa_tasks.visstat(*v, **k)[source]
pipeline.infrastructure.casa_tasks.wvrgcal(*v, **k)[source]

pipeline.infrastructure.casa_tools module

pipeline.infrastructure.casa_tools.AgentFlagger(filename, **kwargs)
pipeline.infrastructure.casa_tools.CalAnalysis(filename, **kwargs)
pipeline.infrastructure.casa_tools.ImageReader(filename, **kwargs)
pipeline.infrastructure.casa_tools.ImagerReader(filename, **kwargs)
pipeline.infrastructure.casa_tools.MSMDReader(filename, **kwargs)
pipeline.infrastructure.casa_tools.MSReader(filename, **kwargs)
pipeline.infrastructure.casa_tools.SelectvisReader(filename, **kwargs)
pipeline.infrastructure.casa_tools.TableReader(filename, **kwargs)
pipeline.infrastructure.casa_tools.context_manager_factory(tool_cls, finalisers=None)[source]

Create a context manager function that wraps the given CASA tool.

The returned context manager function takes one argument: a filename. The function opens the file using the CASA tool, returning the tool so that it may be used for queries or other operations pertaining to the tool. The tool is closed once it falls out of scope or an exception is raised.

pipeline.infrastructure.casa_tools.create_logging_class(cls, level=5, to_log=None)[source]

Return a class with all methods decorated to log method calls.

Parameters
  • cls – class to wrap

  • level – log level for emitted messages

  • to_log – methods to log calls for, or None to log all methods

Returns

the decorated class

pipeline.infrastructure.casa_tools.log_call(fn, level)[source]

Decorate a function or method so that all invocations of that function or method are logged.

Parameters
  • fn – function to decorate

  • level – log level (e.g., logging.INFO, logging.WARNING, etc.)

Returns

decorated function

pipeline.infrastructure.casa_tools.pickler(_)
pipeline.infrastructure.casa_tools.post_to_log(comment='', echo_to_screen=True)[source]
pipeline.infrastructure.casa_tools.selectvis_context_manager(tool_cls)[source]

Create an imager tool context manager function that opens the MS using im.selectvis in read-only mode.

The returned context manager function takes one argument: a filename. The function opens the file using the CASA imager tool, returning the tool so that it may be used for queries or other operations pertaining to the tool. The tool is closed once it falls out of scope or an exception is raised.

pipeline.infrastructure.casa_tools.set_log_origin(fromwhere='')[source]
pipeline.infrastructure.casa_tools.tool_type

alias of pipeline.infrastructure.casa_tools.LoggingVpmanager

pipeline.infrastructure.casa_tools.unpickler(data)

pipeline.infrastructure.contfilehandler module

Class to handle continuum frequency range files.

The text files contain ranges per source and spw using CASA syntax. The keyword “NONE” can be written in case of non-detection of a continuum frequency range.

class pipeline.infrastructure.contfilehandler.ContFileHandler(filename, warn_nonexist=False)[source]

Bases: object

get_merged_selection(field_name, spw_id, cont_ranges=None)[source]
lsrk_to_topo(selection, msnames, fields, spw_id, observing_run, ctrim=0, ctrim_nchan=- 1)[source]
read(skip_none=False, warn_nonexist=False)[source]
write(cont_ranges=None)[source]

pipeline.infrastructure.exceptions module

exception pipeline.infrastructure.exceptions.PipelineException[source]

Bases: Exception

Pipeline exception class

pipeline.infrastructure.executeppr module

pipeline.infrastructure.executeppr.executeppr(pprXmlFile: str, importonly: bool = True, breakpoint: str = 'breakpoint', bpaction: str = 'ignore', loglevel: str = 'info', plotlevel: str = 'default', interactive: bool = True)[source]

Runs Pipeline Processing Request (PPR).

Executes pipeline tasks based on instructions described in pprXmlFile.

Parameters
  • pprXmlFile – A path to PPR file.

  • importonly – Whether or not to indicate to stop processing after importing data. If True, execution of PPR stops after h*_importdata stage. The parameter has no effect if there is no h*_importdata stage in PPR.

  • breakpoint – A name of command that should be considered as a break point.

  • bpaction – An action to be taken at the breakpoint. Available actions are, ‘ignore’: ignores breakpoint in pprXmlFile. ‘break’: stop execution at the breakpoint in pprXmlFile. ‘resume’: resume the last context and restart processing after the

    breakpoint in pprXmlFile.

  • loglevel – A logging level. Available levels are, ‘critical’, ‘error’, ‘warning’, ‘info’, ‘debug’, ‘todo’, and ‘trace’.

  • plotlevel – A plot level. Available levels are, ‘all’, ‘default’, and ‘summary’

  • interactive – If True, print pipeline log to STDOUT.

Examples

Only import EBs. >>> executeppr(‘PPR_uid___A001_X14c3_X1dd.xml’)

Full execution of PPR. >>> executeppr(‘PPR_uid___A001_X14c3_X1dd.xml’, importonly=False)

Run pipeline tasks up to the ‘breakpoint’ in PPR and save context. >>> executeppr(‘PPR_uid___A001_X14c3_X1dd.xml’, importonly=False, bpaction=’break’)

Resume execution from the ‘breakpoint’ in PPR. >>> executeppr(‘PPR_uid___A001_X14c3_X1dd.xml’, importonly=False, bpaction=’resume’)

pipeline.infrastructure.executevlappr module

pipeline.infrastructure.executevlappr.executeppr(pprXmlFile, importonly=True, dry_run=False, loglevel='info', plotlevel='summary', interactive=True)[source]

pipeline.infrastructure.filenamer module

class pipeline.infrastructure.filenamer.ASDM(other=None)[source]

Bases: pipeline.infrastructure.filenamer.NamingTemplate

Defines the ASDM naming scheme.

ASDM file names have the syntax <project code>.<ASDM UID>.asdm, eg. pcode.uid___X02_X3d737_X1.asdm.

asdm(uid)[source]

Set the ASDM UID for this template, eg. uid://X03/XA83C/X02.

class pipeline.infrastructure.filenamer.AmplitudeCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.AntposCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.BandpassCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.CASALog(other=None)[source]

Bases: pipeline.infrastructure.filenamer.NamingTemplate

class pipeline.infrastructure.filenamer.CalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.NamingTemplate

Defines the calibration table naming scheme.

File names have the syntax: <ASDM UID>.<spgrp>.<pol>.<fit>.<type>.<format>, eg. uid___X02_X3d737_X1.spwgrp1.X.channel.bcal.tbl.

amplitude_cal()[source]

Set the filename extension as appropriate for an amplitude calibration.

amplitude_only_gain_cal()[source]

Set the filename extension as appropriate for an amplitude-only gain calibration.

antpos_cal()[source]

Set the filename extension as appropriate for an antpos calibration.

asdm(uid)[source]

Set the ALMA project code for this template, eg. 2010.03.S.

bandpass_cal()[source]

Set the filename extension as appropriate for a bandpass calibration.

channel_fit()[source]

Set the ‘type’ filename component for a per-channel calibration fit.

delay_cal()[source]

Set the filename extension as appropriate for a delay calibration.

extension(extension)[source]

Set the extension for this calibration table.

The extension is not validated against the known set of calibration table extensions, so where possible it is preferable that you use one of the calibration type convenience methods: bandpass_cal(), focus_cal(), delay_cal() etc.

flag_marks(flag_marks)[source]

Set the flag marks tag for this calibration table.

The flag marks tag is a free parameter, and is not currently part of the file naming scheme proposal.

flux_cal()[source]

Set the filename extension as appropriate for a flux calibration.

gain_cal()[source]

Set the filename extension as appropriate for a gain calibration.

gc_cal()[source]

Set the filename extension as appropriate for a gc calibration.

instrumentpol_cal()[source]

Set the filename extension as appropriate for a instrument pol calibration.

method(method)[source]

Set the calibration method for this calibration table.

The method argument is a free parameter, and is not currently part of the file naming scheme proposal. It is used to show how a particular calibration was calculated - in addition to the ‘type’ parameter that should also be set on a calibration table.

opac_cal()[source]

Set the filename extension as appropriate for a opac calibration.

phase_only_gain_cal()[source]

Set the filename extension as appropriate for a phase-only gain calibration.

polarization(polarization)[source]

Set the polarization for this calibration table, eg. ‘XX’, ‘YY’.

polarization_cal()[source]

Set the filename extension as appropriate for a polarization calibration.

poly_fit()[source]

Set the ‘type’ filename component for a polynomial calibration fit.

rq_cal()[source]

Set the filename extension as appropriate for a rq calibration.

sdbaseline()[source]

Set the filename extension as appropriate for a single dish baseline subtraction.

sdsky_cal()[source]

Set the filename extension as appropriate for a single dish sky calibration.

smooth(smooth)[source]

Set the smoothing width for this calibration table.

The smoothing width is a free parameter, and is not currently part of the file naming scheme proposal.

solint(solint)[source]

Set the solution interval tag for this calibration table.

The solution interval tag is a free parameter, and is not currently part of the file naming scheme proposal.

source(source)[source]
spectral_window(window)[source]
spectral_window_nochan(window)[source]
spline_fit()[source]

Set the ‘type’ filename component for a spline calibration fit.

stage(stage)[source]
swpow_cal()[source]

Set the filename extension as appropriate for a swpow calibration.

tecim_cal()[source]

Set the filename extension as appropriate for a tecmaps calibration.

time_series_fit()[source]

Set the ‘type’ filename component for a time-series calibration fit.

tsys_cal()[source]

Set the filename extension as appropriate for a tsys calibration.

type(type)[source]

Set the type component for this calibration table.

The type is not validated, so where possible it is preferable that you use one of the fit type convenience methods, eg. channel_fit(), poly_fit(), spline_fit() etc.

uvcont_cal()[source]

Set the filename extension as appropriate for a uv continuum calibration.

wvrg_cal()[source]

Set the filename extension as appropriate for a wvr calibration.

xyf0_cal()[source]

Set the filename extension as appropriate for a wvr calibration.

class pipeline.infrastructure.filenamer.DelayCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.FileNameComponentBuilder[source]

Bases: object

antenna(antenna)[source]
asdm(uid)[source]
band(band)[source]
build()[source]
extension(extension)[source]
flag_marks(flag_marks)[source]
format(format)[source]
intent(intent)[source]
iteration(iteration)[source]
line_region(start_channel, end_channel)[source]
method(method)[source]
output_dir(output_dir)[source]
polarization(polarization)[source]
smooth(smooth)[source]
solint(solint)[source]
source(source_name)[source]
specmode(specmode)[source]
spectral_window(window)[source]
spectral_window_nochan(window)[source]
stage(stage)[source]
task(task)[source]
type(type)[source]
class pipeline.infrastructure.filenamer.FlaggingTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.NamingTemplate

Defines the flagging table naming scheme.

File names have the syntax: <ASDM UID>.flags.tbl, eg. uid___X02_X3d737_X1.flags.tbl.

asdm(uid)[source]

Set the ASDM UID for this template, eg. uid://X03/XA83C/X02.

class pipeline.infrastructure.filenamer.FluxCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.GainCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.GainCurvesCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.Image[source]

Bases: pipeline.infrastructure.filenamer.NamingTemplate

antenna(antenna)[source]
band(band)[source]
bandpass()[source]
clean()[source]
clean_mask()[source]
continuum_image()[source]
dirty()[source]
flag_marks(flag_marks)[source]

Set the flag marks tag for this image.

The flag marks tag is a free parameter, and is not currently part of the file naming scheme proposal.

flat_flux_clean()[source]
flat_flux_residual()[source]
flux()[source]
fluxscale()[source]
gain()[source]
integrated_fluxscale()[source]
intent(intent)[source]
iteration(iteration)[source]
line_region(start_channel, end_channel)[source]
model()[source]
polarization(polarization)[source]
psf()[source]
residual()[source]
science()[source]
single_dish()[source]
source(source_name)[source]
specmode(specmode)[source]
spectral_image()[source]
spectral_window(window)[source]
stage(stage)[source]
type(type)[source]
class pipeline.infrastructure.filenamer.InstrumentPolCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.MeasurementSet(other=None)[source]

Bases: pipeline.infrastructure.filenamer.NamingTemplate

Defines the measurement set naming scheme.

File names have the syntax: <project code>.<ASDM UID>.ms.tbl, eg. pcode.uid___X02_X3d737_X1.ms.tbl.

asdm(uid)[source]

Set the ASDM UID for this template, eg. uid://X03/XA83C/X02.

class pipeline.infrastructure.filenamer.MosaicImage[source]

Bases: pipeline.infrastructure.filenamer.Image

flag_marks(flag_marks)[source]

Set the flag marks tag for this image.

The flag marks tag is a free parameter, and is not currently part of the file naming scheme proposal.

class pipeline.infrastructure.filenamer.NamingTemplate[source]

Bases: object

Base class used for all naming templates.

dry_run = True
get_filename(delete=False)[source]

Assembles and returns the final filename.

output_dir(output_dir)[source]

Set the base output directory for this template.

task = None
class pipeline.infrastructure.filenamer.OpCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.RqCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.SDBaselineTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.SDSkyCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.SwpowCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.TecMapsCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.TsysCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.UVcontCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.WvrgCalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

class pipeline.infrastructure.filenamer.XYf0CalibrationTable(other=None)[source]

Bases: pipeline.infrastructure.filenamer.CalibrationTable

pipeline.infrastructure.filenamer.fitsname(products_dir, imagename, version=1)[source]

Strip off stage and iter information to generate FITS file name.

pipeline.infrastructure.filenamer.sanitize(text, valid_chars=None)[source]
pipeline.infrastructure.filenamer.sort_spws(unsorted)[source]

pipeline.infrastructure.imageheader module

pipeline.infrastructure.imageheader.clean_extendable_keys(data, key, num_keys=None)[source]

Remove extra entries in data. Logic is as follows:

  1. if num_keys is not given, take the number from the data using key (“n{key}”)

  2. check if the entry whose keyword is “{key}X” where X denotes any integer

  3. remove the entry if X > num_keys

Parameters
  • data {dict} – Dictionary to be processed

  • key {str} – Key for the dictionary

Keyword Arguments

{int} -- Number of expected entries for (num_keys) – given key. If not given (None), get the number from data. (default: {None})

Returns

dict – Reference to the data

pipeline.infrastructure.imageheader.set_miscinfo(name, spw=None, field=None, nfield=None, type=None, iter=None, multiterm=None, intent=None, specmode=None, robust=None, is_per_eb=None, context=None)[source]

Define miscellaneous image information

pipeline.infrastructure.imagelibrary module

class pipeline.infrastructure.imagelibrary.ImageItem(imagename, sourcename, spwlist, specmode, sourcetype, multiterm=None, imageplot='', metadata={}, imaging_params={}, org_direction=None)[source]

Bases: object

class pipeline.infrastructure.imagelibrary.ImageLibrary[source]

Bases: object

add_item(imageitem, overwrite=True)[source]
clear_imlist()[source]
delete_item(imageitem)[source]
find_imageitem(imageitem)[source]
get_imlist()[source]
product_in_list(imageitem)[source]
class pipeline.infrastructure.imagelibrary.ImageMetadata[source]

Bases: dict

pipeline.infrastructure.jobrequest module

class pipeline.infrastructure.jobrequest.FunctionArg(name, value)[source]

Bases: object

Class to hold named function or method arguments

class pipeline.infrastructure.jobrequest.JobRequest(fn, *args, **kw)[source]

Bases: object

execute(dry_run=False, verbose=False)[source]

Execute this job, returning any result to the caller.

Parameters
  • dry_run (boolean) – True if the job should be logged rather than executed (default: False)

  • verbose (boolean) – True if the complete invocation, including all default variables and arguments, should be logged instead of just those explicitly given (default: False)

hash_code(ignore=None)[source]

Get the numerical hash code for this JobRequest.

This code should - but is not guaranteed - to be unique.

class pipeline.infrastructure.jobrequest.NamelessArg(value)[source]

Bases: object

Class to hold unnamed arguments

pipeline.infrastructure.jobrequest.UUID_to_underscore(argument)[source]

Return an argument with UUIDs converted to underscores.

Parameters

argument – the FunctionArg or NamelessArg to sort

Returns

a value-sorted argument

pipeline.infrastructure.jobrequest.alphasort(argument)[source]

Return an argument with values sorted so that the log record is easier to compare to other pipeline executions.

Parameters

argument – the FunctionArg or NamelessArg to sort

Returns

a value-sorted argument

pipeline.infrastructure.jobrequest.get_fn_name(fn)[source]

Return a tuple stating the name of the function and whether the function is a CASA task.

Parameters

fn – the function to inspect

Returns

(function name, bool) tuple

pipeline.infrastructure.jobrequest.truncate_paths(arg)[source]

pipeline.infrastructure.launcher module

The launcher module contains classes to initialize the pipeline, potentially from a saved context state.

class pipeline.infrastructure.launcher.Context(output_dir=None, name=None)[source]

Bases: object

Context holds all pipeline state, consisting of metadata describing the data set, objects describing the pipeline calibration state, the tree of Results objects summarising the results of each pipeline task, and a small number of internal pipeline variables and objects.

The aim of the Context class is to provide one central object to which all pipeline state is attached. Keeping all state in one object makes it easy to persist this one object, and thus all state, to disk as a Python pickle, allowing pipeline sessions to be interrupted and resumed.

… py:attribute:: project_summary

project summary information.

… py:attribute:: project_structure

ALMA project structure information.

observing_run

the top-level (ObservingRun) through which all other pipeline.domain objects can be accessed

callibrary

the (CalLibrary) object holding the calibration state for registered measurement sets

calimlist

the (ImageLibrary) object holding final images of calibrators

sciimlist

the (ImageLibrary) object holding final images of science targets

results

the list of (Result) objects holding summaries of the pipeline run, one (Result) for each task.

output_dir

the directory to which pipeline data should be sent

raw_dir

the directory holding the raw ASDMs or measurement sets (not used)

report_dir

the directory where pipeline HTML reports should be sent

task_counter

integer holding the index of the last task to complete

subtask_counter

integer holding the index of the last subtask to complete

name

name of the context; also forms the root of the filename used for the pickled state

imaging_mode

imaging mode string; may be used to switch between imaging parameter heuristics; currently only used for deciding what products to export

property output_dir
property products_dir
property report_dir
save(filename=None)[source]
set_state(cls, name, value)[source]

Set a context property using the class name, property name and property value. The class name should be one of:

  1. ‘ProjectSummary’

  2. ‘ProjectStructure’

  3. ‘PerformanceParameters’

Background: see CAS-9497 - add infrastructure to translate values from intent.xml to setter functions in casa_pipescript.

Parameters
  • cls – class identifier

  • name – property to set

  • value – value to set

Returns

property stage
class pipeline.infrastructure.launcher.Pipeline(context=None, output_dir='./', loglevel='info', casa_version_check=True, name=None, plotlevel='default', path_overrides={})[source]

Bases: object

Pipeline is the entry point for initialising the pipeline. It is responsible for the creation of new ~Context objects and for loading saved Contexts from disk.

TODO replace this class with a static factory method on Context?

close()[source]

pipeline.infrastructure.logging module

class pipeline.infrastructure.logging.CASALogHandler(log=None)[source]

Bases: logging.Handler

A handler class which writes logging records, appropriately formatted, to the CASA log.

emit(record)[source]

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream.

flush()[source]

Flushes the stream.

static get_casa_priority(lvl)[source]
class pipeline.infrastructure.logging.CapturingHandler(level=30)[source]

Bases: logging.Handler

A handler class which buffers logging records above a certain threshold in memory.

close()[source]

Close the handler.

This version just flushes and chains to the parent class’ close().

emit(record)[source]

Emit a record.

Append the record to the buffer.

flush()[source]

Override to implement custom flushing behaviour.

This version just zaps the buffer to empty.

class pipeline.infrastructure.logging.Code(code)[source]

Bases: object

class pipeline.infrastructure.logging.Frame(frame)[source]

Bases: object

class pipeline.infrastructure.logging.SuspendCapturingLogger[source]

Bases: object

class pipeline.infrastructure.logging.Traceback(tb)[source]

Bases: object

class pipeline.infrastructure.logging.UTCFormatter(fmt=None, datefmt=None, style='%')[source]

Bases: logging.Formatter

converter()
gmtime([seconds]) -> (tm_year, tm_mon, tm_mday, tm_hour, tm_min,

tm_sec, tm_wday, tm_yday, tm_isdst)

Convert seconds since the Epoch to a time tuple expressing UTC (a.k.a. GMT). When ‘seconds’ is not passed in, convert the current time instead.

If the platform supports the tm_gmtoff and tm_zone, they are available as attributes only.

pipeline.infrastructure.logging.add_handler(handler)[source]

Add given handler to all registered loggers.

pipeline.infrastructure.logging.get_logger(name, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='ANSI_X3.4-1968'>, level=None, filename=None, filemode='w', filelevel=None, propagate=False, addToCasaLog=True)[source]

Do basic configuration for the logging system. Similar to logging.basicConfig but the logger name is configurable and both a file output and a stream output can be created. Returns a logger object.

The default behaviour is to create a StreamHandler which writes to sys.stdout, set a formatter using the “%(message)s” format string, and add the handler to the name logger.

A number of optional keyword arguments may be specified, which can alter the default behaviour.

Parameters
  • name – Logger name

  • format – handler format string

  • datefmt – handler date/time format specifier

  • stream – initialize the StreamHandler using stream (None disables the stream, default=sys.stdout)

  • level – logger level (default=current pipeline log level).

  • filename – create FileHandler using filename (default=None)

  • filemode – open filename with specified filemode (‘w’ or ‘a’)

  • filelevel – logger level for file logger (default=``level``)

  • propagate – propagate message to parent (default=False)

  • addToCasaLog – emit log message to CASA logs too (default=True)

Returns

logging.Logger object

pipeline.infrastructure.logging.remove_handler(handler)[source]

Remove specified handler from all registered loggers.

pipeline.infrastructure.logging.set_logging_level(logger=None, level='info')[source]
pipeline.infrastructure.logging.suspend_handler(handler_class)[source]

Remove and return any logger of the given class from the list of active loggers.

Parameters

handler_class – the class to remove

Returns

set of handler classes removed by this call

pipeline.infrastructure.mpihelpers module

class pipeline.infrastructure.mpihelpers.AsyncTask(executable)[source]

Bases: object

get_result()[source]

Get the result from the executed task.

This method blocks until execution of the asynchronous task is complete.

Returns

the Result returned by the executing task

Return type

pipeline.infrastructure.api.Result

Raises

PipelineException – if the task did not complete successfully.

class pipeline.infrastructure.mpihelpers.Executable[source]

Bases: object

abstract get_executable()[source]

Recreate and return the executable object. The executable object should have an .execute() function.

class pipeline.infrastructure.mpihelpers.SyncTask(task, executor=None)[source]

Bases: object

get_result()[source]

Get the result from the executed task.

This method starts execution of the wrapped task and blocks until execution is complete.

Returns

the Result returned by the executing task

Return type

pipeline.infrastructure.api.Result

Raises

pipeline.infrastructure.exceptions.PipelineException – if the

task did not complete successfully.

class pipeline.infrastructure.mpihelpers.Tier0FunctionCall(fn, *args, **kwargs)[source]

Bases: object

get_executable()[source]
class pipeline.infrastructure.mpihelpers.Tier0JobRequest(creator_fn, job_args)[source]

Bases: pipeline.infrastructure.mpihelpers.Executable

get_executable()[source]

Recreate and return the executable object. The executable object should have an .execute() function.

class pipeline.infrastructure.mpihelpers.Tier0PipelineTask(task_cls, task_args, context_path)[source]

Bases: pipeline.infrastructure.mpihelpers.Executable

get_executable()[source]

Recreate and return the executable object. The executable object should have an .execute() function.

pipeline.infrastructure.mpihelpers.is_mpi_ready()[source]
pipeline.infrastructure.mpihelpers.mpiexec(tier0_executable)[source]

Execute a pipeline task.

This function is used to recreate and execute tasks on cluster nodes.

Parameters

tier0_executable – the Tier0Executable task to execute

Returns

the Result returned by executing the task

pipeline.infrastructure.mpihelpers.parse_mpi_input_parameter(input_arg)[source]

pipeline.infrastructure.pipelineqa module

The pipelineqa module contains base classes and plugin registry for the pipeline’s QA framework.

This module contains four key classes:

  • QAScore: the base class for QA scores

  • QAScorePool: the container for lists of QAScores

  • QAPlugin: the base class for task-specific QA handlers

  • QARegistry: the registry and manager for QA plug-ins

Tasks provide and register their own QA handlers that each extends QAPlugin. These QA handlers analyse the results of a task, in the process adding QA scores to the Result for display in the web log.

The pipeline QA framework is activated whenever a Results instance is accepted into the pipeline context. The pipeline QA framework operates by calling is_handler_for(result) on each registered QAPlugin, passing it the the accepted Results instance for inspection. QAPlugins that claim to handle the Result are given the Result for processing. In this step, the QA framework calls QAPlugin.handle(context, result), the method overridden by the task-specific QAPlugin.

class pipeline.infrastructure.pipelineqa.QAOrigin(metric_name, metric_score, metric_units)

Bases: tuple

property metric_name

Alias for field number 0

property metric_score

Alias for field number 1

property metric_units

Alias for field number 2

class pipeline.infrastructure.pipelineqa.QAPlugin[source]

Bases: object

QAPlugin is the mandatory base class for all pipeline QA handlers.

Each pipeline tasks should create its own task-specific QA handler that extends QAPlugin and implements the QAPlugin.handle(context, result) method to perform QA analysis specific to that task. New QA handlers should specify which type of Results classes they process by defining the result_cls and child_cls class properties. If the same Results class is returned by multiple tasks, e.g., fluxscale and setjy, then the generating_task class property should also be defined, which will cause the handler to be activated only when the Result instance is generated by the specified task.

The results structure for many pipeline tasks is to return a ResultsList container object that contains many task-specific Results instances, one per EB. Two QAPlugins must be registered for this type of task: one to process the per-EB Results leaf objects, and another to process the containing ResultsList, pulling up the QA scores on each per-EB Result into the ResultsList’s QAScorePool and setting a representative score. This can be achieved with two new QAPlugins, e.g.,

# This will process the per-EB results MyTaskQAPlugin(QAHandler):

result_cls = MyTaskResults child_cls = None

# This will process the container MyTaskContainerQAPlugin(QAHandler):

result_cls =ResultsList child_cls = MyTaskResults

Within QAPlugin.handle(context, result), a QA Handler can analyse, modify, or make additions to the Results instances in any way it sees fit. In practice, the standard modification is to create and add one or more new QAScore instances to the QAScorePool attribute of the Result.

Extending the QAPlugin base class automatically registers the subclass with with the pipeline QA framework. However, QAPlugin must be explicitly extended, and not implicitly inherited via another subclass of QAPlugin. Put another way, if class Foo extends QAPlugin, and class Bar extends Foo, only Foo is registered with the QA framework. To register Bar, the class definition must use multiple inheritance, e.g., ‘class Bar(Foo, QAPlugin):’.

child_cls = None
generating_task = None
abstract handle(context, result)[source]
is_handler_for(result)[source]

Return True if this QAPlugin can process the Result.

Parameters

result – the task Result to inspect

Returns

True if the Result can be processed

result_cls = None
class pipeline.infrastructure.pipelineqa.QARegistry[source]

Bases: object

The registry and manager of the pipeline QA framework.

The responsibility of the QARegistry is to pass Results to QAPlugins that can handle them.

add_handler(handler)[source]
do_qa(context, result)[source]
class pipeline.infrastructure.pipelineqa.QAScore(score, longmsg='', shortmsg='', vis=None, origin=QAOrigin(metric_name='Unknown metric', metric_score='N/A', metric_units=''), weblog_location=<WebLogLocation.UNSET: 4>, hierarchy='', applies_to: Optional[pipeline.infrastructure.pipelineqa.TargetDataSelection] = None)[source]

Bases: object

class pipeline.infrastructure.pipelineqa.QAScorePool[source]

Bases: object

all_unity_longmsg = 'All QA completed successfully'
all_unity_shortmsg = 'QA pass'
property representative
class pipeline.infrastructure.pipelineqa.TargetDataSelection(session: Set[str] = None, vis: Set[str] = None, scan: Set[int] = None, spw: Set[int] = None, field: Set[int] = None, intent: Set[str] = None, ant: Set[int] = None, pol: Set[str] = None)[source]

Bases: object

TargetDataSelection is a struct to hold data selection metadata. Its various properties (vis, scan, spw, etc.) should be set to specify to which subset of data something applies.

class pipeline.infrastructure.pipelineqa.WebLogLocation(value)[source]

Bases: enum.Enum

WebLogLocation is an enumeration attached to each QA score, specifying where in the web log this QA score should be included.

ACCORDION = 2
BANNER = 1
HIDDEN = 3
UNSET = 4

pipeline.infrastructure.project module

class pipeline.infrastructure.project.LoggingModificationListener[source]

Bases: pipeline.infrastructure.project.ModificationListener

ModificationListener that logs INFO messages when the target is modified.

on_delattr(modified_obj, attr)[source]
on_setattr(modified_obj, attr, val)[source]
class pipeline.infrastructure.project.ModificationListener[source]

Bases: object

Interface for listener classes that want to be notified when an object property changes.

Notification will be received after the change has already occurred. It is not possible to veto a change using this class.

Note: this function require two functions to be implemented (on_setattr and on_delattr) rather than having one function switching on an enumeration to emphasise that this class is deliberately tied into the Python data model.

abstract on_delattr(modified_obj, attr)[source]
abstract on_setattr(modified_obj, attr, val)[source]
class pipeline.infrastructure.project.ModificationPublisher[source]

Bases: object

Base class that publishes an event to registered listeners when public properties of an instance of this class are set or deleted.

Notifications will only be published when the property points to a new value. Events for in-situ modifications (e.g., adding a value to an existing list) will not be received.

Notifications will be sent after the change has already occurred. It is not possible to veto a change using this implementation.

Background: see CAS-9497, which wants to log the PPR XML directives that would otherwise be missing from the casa_pipescript.py

add_listener(listener)[source]
as_dict()[source]
remove_listener(listener)[source]
class pipeline.infrastructure.project.PerformanceParameters(desired_angular_resolution='0.0arcsec', min_angular_resolution='0.0arcsec', max_angular_resolution='0.0arcsec', desired_sensitivity='0.0mJy', desired_dynamic_range=1.0, representative_source='', representative_spwid='', representative_frequency='0.0GHz', representative_bandwidth='0.0MHz', max_cube_size=- 1.0, max_product_size=- 1.0)[source]

Bases: object

class pipeline.infrastructure.project.ProjectStructure(ous_entity_type='ObsProject', ous_entity_id='unknown', ous_part_id='unknown', ous_title='undefined', ous_type='Member', ps_entity_type='ProjectStatus', ps_entity_id='unknown', ousstatus_type='OUSStatus', ousstatus_entity_id='unknown', ppr_type='SciPipeRequest', ppr_entity_id='unknown', ppr_file='', recipe_name='Undefined')[source]

Bases: object

class pipeline.infrastructure.project.ProjectSummary(proposal_code='', proposal_title='undefined', piname='undefined', observatory='ALMA Joint Observatory', telescope='ALMA')[source]

Bases: object

pipeline.infrastructure.sessionutils module

class pipeline.infrastructure.sessionutils.ParallelTemplate(inputs)[source]

Bases: pipeline.infrastructure.basetask.StandardTaskTemplate

abstract property Task

A reference to the Task class containing the implementation for this pipeline stage.

analyse(assessed)[source]

Determine the best parameters by analysing the given jobs before returning any final jobs to execute.

Parameters

jobs (a list ofJobRequest) – the job requests generated by prepare()

Return type

Result

get_result_for_exception(vis, result)[source]
is_multi_vis_task = True
prepare()[source]

Prepare job requests for execution.

Parameters

parameters – the parameters to pass through to the subclass. Refer to the implementing subclass for specific information on what these parameters are.

Return type

a class implementing Result

class pipeline.infrastructure.sessionutils.VDPTaskFactory(inputs, executor, task)[source]

Bases: object

VDPTaskFactory is a class that implements the Factory design pattern, returning tasks that execute on an MPI client or locally as appropriate.

The correctness of this task is dependent on the correct mapping of Inputs arguments to measurement set, hence it is dependent on Inputs objects that sub-class VDP StandardInputs.

get_task(vis)[source]

Create and return a SyncTask or AsyncTask for the job.

Parameters

vis (str) – measurement set to create a job for

Returns

task object ready for execution

Return type

a tuple of (task arguments, (SyncTask|AsyncTask)

class pipeline.infrastructure.sessionutils.VisResultTuple(vis, inputs, result)

Bases: tuple

property inputs

Alias for field number 1

property result

Alias for field number 2

property vis

Alias for field number 0

pipeline.infrastructure.sessionutils.as_list(o)[source]
pipeline.infrastructure.sessionutils.group_into_sessions(context, all_results)[source]

Return results grouped into lists by session.

Parameters
  • context (Context) – pipeline context

  • all_results (list) – result to be grouped

Returns

dict of sessions to results for that session

Return type

dict {session name: [result, result, ..]

pipeline.infrastructure.sessionutils.parallel_inputs_impl()[source]

Get a vis-independent property implementation for a parallel Inputs argument.

Returns

Inputs property implementation

Return type

property

pipeline.infrastructure.sessionutils.remap_spw_int(source_ms, target_ms, spws)[source]

Map integer spw arguments from one MS to their equivalent spw in the target ms.

Parameters
  • source_ms (domain.MeasurementSet) – the MS to map spws from

  • target_ms (domain.MeasurementSet) – the MS to map spws to

  • spws – the spw argument to convert

Returns

a list of remapped integer spw IDs

Return type

list

pipeline.infrastructure.sessionutils.remap_spw_str(source_ms, target_ms, spws)[source]

Remap a string spw argument, e.g., ‘16,18,20,22’, from one MS to the equivalent map in the target ms.

Parameters
  • source_ms (domain.MeasurementSet) – the MS to map spws from

  • target_ms (domain.MeasurementSet) – the MS to map spws to

  • spws – the spw argument to convert

Returns

a list of remapped integer spw IDs

Return type

str

pipeline.infrastructure.tablereader module

class pipeline.infrastructure.tablereader.AntennaTable[source]

Bases: object

static get_antenna_array(msmd)[source]
static get_antennas(msmd)[source]
class pipeline.infrastructure.tablereader.BandDescriber[source]

Bases: object

alma_bands = {'ALMA Band 1': <FrequencyRange(31.300 GHz, 45.000 GHz)>, 'ALMA Band 10': <FrequencyRange(787.000 GHz, 950.000 GHz)>, 'ALMA Band 2': <FrequencyRange(67.000 GHz, 90.000 GHz)>, 'ALMA Band 3': <FrequencyRange(84.000 GHz, 116.000 GHz)>, 'ALMA Band 4': <FrequencyRange(125.000 GHz, 163.000 GHz)>, 'ALMA Band 5': <FrequencyRange(163.000 GHz, 211.000 GHz)>, 'ALMA Band 6': <FrequencyRange(211.000 GHz, 275.000 GHz)>, 'ALMA Band 7': <FrequencyRange(275.000 GHz, 373.000 GHz)>, 'ALMA Band 8': <FrequencyRange(385.000 GHz, 500.000 GHz)>, 'ALMA Band 9': <FrequencyRange(602.000 GHz, 720.000 GHz)>}
evla_bands = {'0.7cm (Q)': <FrequencyRange(40.000 GHz, 56.000 GHz)>, '1.3cm (K)': <FrequencyRange(18.000 GHz, 26.500 GHz)>, '13cm (S)': <FrequencyRange(2.000 GHz, 4.000 GHz)>, '1cm (Ka)': <FrequencyRange(26.500 GHz, 40.000 GHz)>, '20cm (L)': <FrequencyRange(700.000 MHz, 2.000 GHz)>, '2cm (Ku)': <FrequencyRange(12.000 GHz, 18.000 GHz)>, '3cm (X)': <FrequencyRange(8.000 GHz, 12.000 GHz)>, '6cm (C)': <FrequencyRange(4.000 GHz, 8.000 GHz)>}
static get_description(f, observatory='ALMA')[source]
unknown = {'Unknown': <FrequencyRange(0.000 Hz, Infinity THz)>}
class pipeline.infrastructure.tablereader.DataDescriptionTable[source]

Bases: object

static get_descriptions(msmd, ms)[source]
class pipeline.infrastructure.tablereader.ExecblockTable[source]

Bases: object

static get_execblock_info(ms)[source]
class pipeline.infrastructure.tablereader.FieldTable[source]

Bases: object

static get_fields(msmd)[source]
class pipeline.infrastructure.tablereader.MeasurementSetReader[source]

Bases: object

static add_band_to_spws(ms)[source]
static get_measurement_set(ms_file)[source]
static get_scans(msmd, ms)[source]
class pipeline.infrastructure.tablereader.ObservationTable[source]

Bases: object

static get_project_info(msmd)[source]
class pipeline.infrastructure.tablereader.ObservingRunReader[source]

Bases: object

static get_observing_run(ms_files)[source]
class pipeline.infrastructure.tablereader.PolarizationTable[source]

Bases: object

static get_polarizations(msmd)[source]
class pipeline.infrastructure.tablereader.RetrieveByIndexContainer(items, index_fn=operator.attrgetter('id'))[source]

Bases: object

RetrieveByIndexContainer is a container for items whose numeric index or other unique identifier is stored in an instance attribute.

Retrieving by index from this container matches and returns the item with matching index attribute, which may differ from the natural position of the item in the underlying list backing store. For instance, getting item 3 with container[3] returns the item with index attribute == 3, not the item at position 3.

class pipeline.infrastructure.tablereader.SBSummaryInfo(repSource, repFrequency, repBandwidth, repWindow, minAngResolution, maxAngResolution, maxAllowedBeamAxialRatio, sensitivity, dynamicRange, sbName)

Bases: tuple

property dynamicRange

Alias for field number 8

property maxAllowedBeamAxialRatio

Alias for field number 6

property maxAngResolution

Alias for field number 5

property minAngResolution

Alias for field number 4

property repBandwidth

Alias for field number 2

property repFrequency

Alias for field number 1

property repSource

Alias for field number 0

property repWindow

Alias for field number 3

property sbName

Alias for field number 9

property sensitivity

Alias for field number 7

class pipeline.infrastructure.tablereader.SBSummaryTable[source]

Bases: object

static get_observing_mode(ms)[source]
static get_sbsummary_info(ms, obsnames)[source]
class pipeline.infrastructure.tablereader.SourceTable[source]

Bases: object

static get_sources(msmd)[source]
class pipeline.infrastructure.tablereader.SpectralWindowTable[source]

Bases: object

static get_asdm_to_ms_spw_mapping(ms)[source]

Get the mapping of ASDM spectral window ID to Measurement Set spectral window ID.

This function requires the SpectralWindow and DataDescription ASDM XML files to have been copied across to the measurement set directory.

Parameters

ms – measurement set to inspect

Returns

dict of ASDM spw: MS spw

static get_data_description_spw_ids(ms)[source]

Extract a list of spectral window IDs from the DataDescription XML for an ASDM.

This function assumes the XML has been copied across to the measurement set directory.

Parameters

ms – measurement set to inspect

Returns

list of integers corresponding to ASDM spectral window IDs

static get_receiver_info(ms)[source]

Extract information about the receiver from the ASDM_RECEIVER table. The following properties are extracted: * receiver type (e.g.: TSB, DSB, NOSB) * local oscillator frequencies

If multiple entries are present for the same ASDM spwid, then keep

Parameters

ms – measurement set to inspect

Returns

dict of MS spw: (receiver_type, freq_lo)

static get_spectral_window_spw_ids(ms)[source]

Extract a list of spectral window IDs from the SpectralWindow XML for an ASDM.

This function assumes the XML has been copied across to the measurement set directory.

Parameters

ms – measurement set to inspect

Returns

list of integers corresponding to ASDM spectral window IDs

static get_spectral_windows(msmd, ms)[source]
static parse_spectral_window_ids_from_xml(xml_path)[source]

Extract the spectral window ID element from each row of an XML file.

Parameters

xml_path – path for XML file

Returns

list of integer spectral window IDs

class pipeline.infrastructure.tablereader.StateTable[source]

Bases: object

static get_state_factory(msmd)[source]
static get_states(msmd)[source]
pipeline.infrastructure.tablereader.find_EVLA_band(frequency, bandlimits=None, BBAND='?4PLSCXUKAQ?')[source]

identify VLA band

pipeline.infrastructure.taskregistry module

class pipeline.infrastructure.taskregistry.TaskMapping(casa_task, pipeline_class, comment)[source]

Bases: object

class pipeline.infrastructure.taskregistry.TaskRegistry[source]

Bases: object

get_casa_task(cls)[source]

Get the CASA task for a pipeline class.

Raises KeyError if no mapping is found.

Parameters

cls – pipeline class

Returns

name of CASA task as a string

get_comment(cls, format=True)[source]

Get the casa_commands.log entry for a pipeline class.

Parameters
  • cls – the pipeline class to get a comment for

  • format – True if comment should be wrapped

Returns

comment as a string

get_pipeline_class(name)[source]

Get a pipeline class by name.

Raises KeyError if no mapping is found.

Parameters

name – name of the pipeline class as a string

Returns

pipeline class

get_pipeline_class_for_task(task_name)[source]

Get the pipeline class used to execute a CASA task.

Raises KeyError if no mapping is found.

Parameters

task_name – name of CASA task

Returns

pipeline class

set_casa_commands_comment(comment)[source]

A decorator that is used to register the descriptive text that preceeds the list of CASA commands invoked by this task, as seen in casa_commands.log

Parameters

comment – comment to add to casa_commands.log as a string

set_comment_for_class(pipeline_class, comment)[source]

Set the comment for a task mapping.

Parameters
  • pipeline_class – pipeline class to map

  • comment – comment to set as a string

set_equivalent_casa_task(cli_task)[source]

A decorator that is used to register the mapping between a pipeline class and its identity in the CASA command line interface.

Parameters

cli_task – the CASA task name as a string

set_task_identifier(casa_task, pipeline_class)[source]

Set the CASA task for a task mapping.

Parameters
  • comment – casa task name as a string

  • pipeline_class – pipeline class to map

task_mapping_class

alias of TaskMapping

pipeline.infrastructure.vdp module

vdp is a pipeline framework module that contains classes to make writing task Inputs easier.

  • InputsContainer lets task implementations operate within the scope of a single measurement set, even if the pipeline run contains multiple data sets.

  • VisDependentProperty is a reworking of pipeline properties to reduce the amount of boilerplate code required to implement an Inputs class.

Implementation details:

See the documentation on the classes, particularly VisDependentProperty, for detailed information on how the framework operates.

Examples

There are three common scenarios that use VisDependentProperty. The following examples show each scenario for an Inputs property belonging to an Inputs class that extends vdp.StandardInputs.

  1. To provide a default value that can be overridden on a per-MS basis. Use the optional ‘default’ argument to VisDependentProperty, eg:

    myarg = VisDependentProperty(default=’some value’)

  2. For more sophisticated default values, e.g., a default value that is a function of other data or properties, use the @VisDependentProperty decorator. A class property decorated with @VisDependentProperty should return the default value for that property. The function will execute in the scope of a single measurement set, i.e., at the time it is called, vis is set to exactly one value. The function will be called to provide a default value for any measurement set that does not have a user override value.

    @VisDependentProperty def myarg():

    # do some processing then return the calculated value return ‘I am a custom property for measurement set %s’ % self.vis

  3. Convert or validate user input before accept it as an Inputs argument. Use the @VisDependentProperty.convert decorator, possibly alongside the getter decorator as above.

    @VisDependentProperty def myarg():

    # this will return 100 - but only if the user has not supplied # an override value! return 100

    @VisDependentProperty.convert def myarg(user_input):

    # convert then return the user input which is provided as an # argument to the convert decorator. The converted value will be # returned for all subsequent ‘gets’. return int(user_input)

class pipeline.infrastructure.vdp.InputsContainer(task_cls, context, *args, **kwargs)[source]

Bases: object

InputsContainer is the top-level container object for all task Inputs.

InputsContainer contains machinery to let Inputs classes operate purely in the scope of a single measurement set, to make both Inputs and Task implementation much simpler.

The InputsContainer operates in the scope of multiple measurement sets, and holds one Inputs instance for every measurement set within the context At task execution time, the task is executed for each active Inputs instance. Not all the Inputs instances held by the InputsContainer need be active: the user can reduce the scope of the task to a subset of measurement sets by setting vis, which makes an Inputs instance hidden and inactive.

Tasks that operate in the scope of more than one measurement set, e.g, imaging and session-aware tasks, can disable the InputsContainer machinery by setting is_multi_vis_task to True. For these multivis tasks, one Inputs instance is held in an InputsContainer, but all property sets and gets pass directly through the one underlying inputs instance.

For tasks that operate in the scope of a single measurement set, the InputsContainer class works in conjunction with VisDependentProperty to provide and process user input (eg. inputs.solint = 123) according to a set of rules:

  1. If the input is scalar and equal to ‘’ or None, all measurement sets will be mapped back to NullMarker, therefore returning the default value or custom getter function on subsequent access.

  2. If the input is a list with number of items equal to the number of measurement sets, the items will be divided up and treated as mapping one value per measurement set.

  3. Otherwise, the user input is considered as the new default value for all measurement sets.

Before the user input is stored in the dictionary, however, the input is passed through the convert function, assuming one has been provided. The convert function allows the developer to validate or convert user input to a standard format before accepting it as a new argument.

as_dict()[source]
class pipeline.infrastructure.vdp.ModeInputs(context, mode=None, **parameters)[source]

Bases: pipeline.infrastructure.api.Inputs

ModeInputs is a facade for Inputs of a common task type, allowing the user to switch between task implementations by changing the ‘mode’ parameter.

Extending classes should override the _modes dictionary with a set of key/value pairs, each pair mapping the mode name key to the task class value.

as_dict()[source]
classmethod get_constructor_args(ignore=('self', 'context'))[source]

Get the union of all arguments accepted by this class’s constructor.

get_task()[source]

Get the task appropriate to the current Inputs.

property mode
to_casa_args()[source]

Get this Inputs object as a dictionary of CASA arguments.

Return type

a dictionary of CASA arguments

class pipeline.infrastructure.vdp.StandardInputs[source]

Bases: pipeline.infrastructure.api.Inputs

as_dict()[source]
property context

Get the context used as the base for the parameters for this Inputs.

Return type

Context

ms[source]

VisDependentProperty is a Python data descriptor that standardises the behaviour of pipeline Inputs properties and lets them create default values more easily.

On reading a VisDependentProperty (ie. using the dot prefix: inputs.solint), one of two things happens:

  1. If a NullMarker is found - signifying that no user input has been provided - and a ‘getter’ function has been defined, the getter function will be called to provide a default value for that measurement set.

  2. If a user has overridden the value (eg. inputs.solint = 123), that value will be retrieved.

  3. The value, either the default from step 1 or user-provided from step 2, is run through the optional postprocess function, which gives a final opportunity to change the value depending on the state/value of other properties.

A VisDependentProperty can be made read-only by specifying ‘readonly=True’ when creating the instance.

A VisDependentProperty can be hidden from the containing Inputs string representation by setting ‘hidden=True’ when creating the instance. This will hide the property from the web log and CLI getInputs calls.

Each VisDependentProperty has a set of values it considers equivalent to null. When the user sets the VDP value to one of these null values, the VDP machinery converts this to a private NullObject marker that signifies the property is now unset, resulting in the default value being returned next time the property is read. Developers can specify which values should be converted to NullObject by specifying null_input at creation time, e.g.,

solint = @VisDependentProperty(default=5, null_input=[None, ‘’, ‘RESET’, -1])

output_dir[source]

VisDependentProperty is a Python data descriptor that standardises the behaviour of pipeline Inputs properties and lets them create default values more easily.

On reading a VisDependentProperty (ie. using the dot prefix: inputs.solint), one of two things happens:

  1. If a NullMarker is found - signifying that no user input has been provided - and a ‘getter’ function has been defined, the getter function will be called to provide a default value for that measurement set.

  2. If a user has overridden the value (eg. inputs.solint = 123), that value will be retrieved.

  3. The value, either the default from step 1 or user-provided from step 2, is run through the optional postprocess function, which gives a final opportunity to change the value depending on the state/value of other properties.

A VisDependentProperty can be made read-only by specifying ‘readonly=True’ when creating the instance.

A VisDependentProperty can be hidden from the containing Inputs string representation by setting ‘hidden=True’ when creating the instance. This will hide the property from the web log and CLI getInputs calls.

Each VisDependentProperty has a set of values it considers equivalent to null. When the user sets the VDP value to one of these null values, the VDP machinery converts this to a private NullObject marker that signifies the property is now unset, resulting in the default value being returned next time the property is read. Developers can specify which values should be converted to NullObject by specifying null_input at creation time, e.g.,

solint = @VisDependentProperty(default=5, null_input=[None, ‘’, ‘RESET’, -1])

to_casa_args()[source]

Express these inputs as a dictionary of CASA arguments. The values in the dictionary are in a format suitable for CASA and can be directly passed to the CASA task.

Return type

a dictionary of string/??? kw/val pairs

vis

VisDependentProperty is a Python data descriptor that standardises the behaviour of pipeline Inputs properties and lets them create default values more easily.

On reading a VisDependentProperty (ie. using the dot prefix: inputs.solint), one of two things happens:

  1. If a NullMarker is found - signifying that no user input has been provided - and a ‘getter’ function has been defined, the getter function will be called to provide a default value for that measurement set.

  2. If a user has overridden the value (eg. inputs.solint = 123), that value will be retrieved.

  3. The value, either the default from step 1 or user-provided from step 2, is run through the optional postprocess function, which gives a final opportunity to change the value depending on the state/value of other properties.

A VisDependentProperty can be made read-only by specifying ‘readonly=True’ when creating the instance.

A VisDependentProperty can be hidden from the containing Inputs string representation by setting ‘hidden=True’ when creating the instance. This will hide the property from the web log and CLI getInputs calls.

Each VisDependentProperty has a set of values it considers equivalent to null. When the user sets the VDP value to one of these null values, the VDP machinery converts this to a private NullObject marker that signifies the property is now unset, resulting in the default value being returned next time the property is read. Developers can specify which values should be converted to NullObject by specifying null_input at creation time, e.g.,

solint = @VisDependentProperty(default=5, null_input=[None, ‘’, ‘RESET’, -1])

class pipeline.infrastructure.vdp.VisDependentProperty(fdefault=None, fconvert=None, fpostprocess=None, default=<pipeline.infrastructure.vdp.NoDefaultMarker object>, readonly=False, hidden=False, null_input=None)[source]

Bases: object

VisDependentProperty is a Python data descriptor that standardises the behaviour of pipeline Inputs properties and lets them create default values more easily.

On reading a VisDependentProperty (ie. using the dot prefix: inputs.solint), one of two things happens:

  1. If a NullMarker is found - signifying that no user input has been provided - and a ‘getter’ function has been defined, the getter function will be called to provide a default value for that measurement set.

  2. If a user has overridden the value (eg. inputs.solint = 123), that value will be retrieved.

  3. The value, either the default from step 1 or user-provided from step 2, is run through the optional postprocess function, which gives a final opportunity to change the value depending on the state/value of other properties.

A VisDependentProperty can be made read-only by specifying ‘readonly=True’ when creating the instance.

A VisDependentProperty can be hidden from the containing Inputs string representation by setting ‘hidden=True’ when creating the instance. This will hide the property from the web log and CLI getInputs calls.

Each VisDependentProperty has a set of values it considers equivalent to null. When the user sets the VDP value to one of these null values, the VDP machinery converts this to a private NullObject marker that signifies the property is now unset, resulting in the default value being returned next time the property is read. Developers can specify which values should be converted to NullObject by specifying null_input at creation time, e.g.,

solint = @VisDependentProperty(default=5, null_input=[None, ‘’, ‘RESET’, -1])

NO_DEFAULT = <pipeline.infrastructure.vdp.NoDefaultMarker object>
property backing_store_name

The name of the attribute holding the value for this property.

convert(fconvert)[source]

Set the function used to clean and/or convert user-supplied argument values before they are associated with the instance property.

The provided function should accept one unnamed argument, which when passed will be the user input for this measurement set. That is, after potentially being divided up into per-measurement values.

default(fdefault)[source]

Set the function used to get the attribute value when the user has not supplied an override value.

fget(owner)[source]

Gets the underlying property value from an instance of the class owning this property

Parameters

owner

Returns

fset(owner, value)[source]

Sets the property value on the instance owning this property.

Parameters
  • owner

  • value

Returns

postprocess(fpostprocess)[source]

Set the function used to process the value that is about to be returned. This allows the value to be modified or perhaps a different value based on another property to be returned.

Parameters

owner

Returns

Module contents

pipeline.infrastructure.generate_detail_plots(result=None)[source]
pipeline.infrastructure.set_plot_level(plotlevel)[source]