Source code for pipeline.infrastructure.taskregistry
import textwrap
[docs]class TaskMapping(object):
def __init__(self, casa_task, pipeline_class, comment):
self.casa_task = casa_task
self.pipeline_class = pipeline_class
self.comment = comment
def __repr__(self):
return '<TaskMapping({!r}, {!r}, {!r})>'.format(self.casa_task, self.pipeline_class, self.comment)
[docs]class TaskRegistry(object):
# The object used to map pipeline classes to CASA tasks
task_mapping_class = TaskMapping
def __init__(self):
self.task_map = []
[docs] def get_casa_task(self, cls):
"""
Get the CASA task for a pipeline class.
Raises KeyError if no mapping is found.
:param cls: pipeline class
:return: name of CASA task as a string
"""
matching = [c.casa_task for c in self.task_map if c.pipeline_class == cls]
if not matching:
raise KeyError('No task registered for {!s}'.format(cls.__name__))
return matching[0]
[docs] def get_comment(self, cls, format=True):
"""
Get the casa_commands.log entry for a pipeline class.
:param cls: the pipeline class to get a comment for
:param format: True if comment should be wrapped
:return: comment as a string
"""
matching = [c for c in self.task_map if c.pipeline_class == cls]
if not matching:
raise KeyError('No comment registered for {!s}'.format(cls.__name__))
match = matching[0]
comment = match.comment
if comment is None:
comment = 'No comment registered for {!s}'.format(match.casa_task)
if not format:
return comment
wrapped = textwrap.wrap('# ' + comment, subsequent_indent='# ', width=78, break_long_words=False)
return '%s\n#\n' % '\n'.join(wrapped)
[docs] def get_pipeline_class(self, name):
"""
Get a pipeline class by name.
Raises KeyError if no mapping is found.
:param name: name of the pipeline class as a string
:return: pipeline class
"""
matching = [c.pipeline_class for c in self.task_map if c.pipeline_class.__name__ == name]
if not matching:
raise KeyError('No pipeline class registered for {!s}'.format(name))
return matching[0]
[docs] def get_pipeline_class_for_task(self, task_name):
"""
Get the pipeline class used to execute a CASA task.
Raises KeyError if no mapping is found.
:param task_name: name of CASA task
:return: pipeline class
"""
matching = [c.pipeline_class for c in self.task_map if c.casa_task == task_name]
if not matching:
raise KeyError('No pipeline class registered for {!s}'.format(task_name))
return matching[0]
[docs] def set_casa_commands_comment(self, comment):
"""A decorator that is used to register the descriptive text that
preceeds the list of CASA commands invoked by this task, as seen in
casa_commands.log
:param comment: comment to add to casa_commands.log as a string
"""
def decorator(pipeline_class):
self.set_comment_for_class(pipeline_class, comment)
return pipeline_class
return decorator
[docs] def set_comment_for_class(self, pipeline_class, comment):
"""
Set the comment for a task mapping.
:param pipeline_class: pipeline class to map
:param comment: comment to set as a string
"""
existing = [o for o in self.task_map if o.pipeline_class == pipeline_class]
to_add = []
if not existing:
mapping = self.task_mapping_class(None, pipeline_class, comment)
to_add.append(mapping)
for o in existing:
mapping = self.task_mapping_class(o.casa_task, pipeline_class, comment)
to_add.append(mapping)
for o in existing:
self.task_map.remove(o)
for o in to_add:
self.task_map.append(o)
[docs] def set_equivalent_casa_task(self, cli_task):
"""A decorator that is used to register the mapping between a pipeline
class and its identity in the CASA command line interface.
:param cli_task: the CASA task name as a string
"""
def decorator(f):
self.set_task_identifier(cli_task, f)
return f
return decorator
[docs] def set_task_identifier(self, casa_task, pipeline_class):
"""
Set the CASA task for a task mapping.
:param comment: casa task name as a string
:param pipeline_class: pipeline class to map
"""
# assume 1:1 mapping between pipeline class and CASA tasks
existing = [o for o in self.task_map if o.pipeline_class == pipeline_class]
to_add = []
if not existing:
mapping = self.task_mapping_class(casa_task, pipeline_class, None)
to_add.append(mapping)
for o in existing:
mapping = self.task_mapping_class(casa_task, pipeline_class, o.comment)
to_add.append(mapping)
for o in existing:
self.task_map.remove(o)
for o in to_add:
self.task_map.append(o)
task_registry = TaskRegistry()