Source code for pipeline.infrastructure.utils.diagnostics

"""
The diagnostics module contains utility functions used to help profile the
pipeline.
"""
import functools
import os
import platform
import subprocess
import threading

import pipeline.extern.ps_mem as ps_mem
from .. import jobrequest
from .. import logging
from .. import mpihelpers

LOG = logging.get_logger(__name__)

__all__ = ['enable_fd_logs', 'enable_memstats']


[docs]def enable_memstats(): if platform.system() == 'Darwin': LOG.error('Cannot measure memory on OS X.') return if enable_memstats.enabled: LOG.error('enable_memstats() already enabled') return LOG.info('Enabling memory statistics logging') import pipeline.domain.measures as measures def get_hook_fn(msg): pid = os.getpid() def log_mem_usage(jobrequest): sorted_cmds, shareds, _, _ = ps_mem.get_memory_usage([pid, ], False, True) for cmd in sorted_cmds: private = measures.FileSize(cmd[1] - shareds[cmd[0]], measures.FileSizeUnits.KILOBYTES) shared = measures.FileSize(shareds[cmd[0]], measures.FileSizeUnits.KILOBYTES) total = measures.FileSize(cmd[1], measures.FileSizeUnits.KILOBYTES) LOG.info('%s%s: private=%s shared=%s total=%s' % ( msg, jobrequest.fn.__name__, str(private), str(shared), str(total))) vm_accuracy = ps_mem.shared_val_accuracy() if vm_accuracy is -1: LOG.warning("Shared memory is not reported by this system. " "Values reported will be too large, and totals " "are not reported") elif vm_accuracy is 0: LOG.warning("Shared memory is not reported accurately by " "this system. Values reported could be too " "large, and totals are not reported.") elif vm_accuracy is 1: LOG.warning("Shared memory is slightly over-estimated by " "this system for each program, so totals are " "not reported.") return log_mem_usage jobrequest.PREHOOKS.append(get_hook_fn('Memory usage before ')) jobrequest.POSTHOOKS.append(get_hook_fn('Memory usage after ')) enable_memstats.enabled = True if mpihelpers.is_mpi_ready(): mpi_server_list = mpihelpers.MPIEnvironment.mpi_server_rank_list() mpihelpers.mpiclient.push_command_request('pipeline.infrastructure.utils.enable_memstats()', block=True, target_server=mpi_server_list)
enable_memstats.enabled = False class Interval(object): def __init__(self, interval, function, args=None, kwargs=None): """ Runs the function at a specified interval with given arguments. """ if args is None: args = [] if kwargs is None: kwargs = {} self.interval = interval self.function = functools.partial(function, *args, **kwargs) self.running = False self._timer = None def __call__(self): """ Handler function for calling the partial and continuting. """ self.running = False # mark not running self.start() # reset the timer for the next go self.function() # call the partial function def start(self): """ Starts the interval and lets it run. """ if self.running: # Don't start if we're running! return # Create the timer object, start and set state. self._timer = threading.Timer(self.interval, self) self._timer.start() self.running = True def stop(self): """ Cancel the interval (no more function calls). """ if self._timer: self._timer.cancel() self.running = False self._timer = None
[docs]def enable_fd_logs(interval_secs=60): """ Log file descriptors to the CASA log every n seconds. :param interval_secs: logging cadence in seconds (default=60) :return: """ if platform.system() == 'Darwin': LOG.error('Cannot list file descriptors on MacOS') return if enable_fd_logs.enabled: LOG.error('enable_memstats() already enabled') return LOG.info('Enabling file descriptor logging') import pipeline.infrastructure.jobrequest as jobrequest pid = os.getpid() def list_file_descriptors(msg, job_name): try: out = subprocess.check_output(['ls', '-l', '/proc/{}/fd'.format(pid)]) except subprocess.CalledProcessError: LOG.info('Could not list file descriptors for PID {}'.format(pid)) else: LOG.info('File descriptors {} {}:\n{}'.format(msg, job_name, out)) def get_hook_fn(msg, start=False, cancel=False): def log_file_descriptors(jobrequest): job_name = jobrequest.fn.__name__ if start: if enable_fd_logs.interval: enable_fd_logs.interval.stop() LOG.info('Logging file descriptors every {} seconds during {}'.format(interval_secs, job_name)) enable_fd_logs.interval = Interval(interval_secs, list_file_descriptors, args=['during', job_name]) enable_fd_logs.interval.start() if cancel and enable_fd_logs.interval: enable_fd_logs.interval.stop() enable_fd_logs.interval = None list_file_descriptors(msg, job_name) return log_file_descriptors jobrequest.PREHOOKS.append(get_hook_fn('before', start=True)) jobrequest.POSTHOOKS.append(get_hook_fn('after', cancel=True)) enable_fd_logs.enabled = True if mpihelpers.is_mpi_ready(): cmd = 'pipeline.infrastructure.utils.enable_fd_logs({})'.format(interval_secs) mpi_server_list = mpihelpers.MPIEnvironment.mpi_server_rank_list() mpihelpers.mpiclient.push_command_request(cmd, block=True, target_server=mpi_server_list)
enable_fd_logs.interval = None enable_fd_logs.enabled = False