compute.parallel

Utilities for parallel computation.

pyphi.compute.parallel.get_num_processes()

Return the number of processes to use in parallel.

class pyphi.compute.parallel.ExceptionWrapper(exception)

A picklable wrapper suitable for passing exception tracebacks through instances of multiprocessing.Queue.

Parameters

exception (Exception) – The exception to wrap.

reraise()

Re-raise the exception.

class pyphi.compute.parallel.MapReduce(iterable, *context)

An engine for doing heavy computations over an iterable.

This is similar to multiprocessing.Pool, but allows computations to shortcircuit, and supports both parallel and sequential computations.

Parameters
  • iterable (Iterable) – A collection of objects to perform a computation over.

  • *context – Any additional data necessary to complete the computation.

Any subclass of MapReduce must implement three methods:

- ``empty_result``,
- ``compute``, (map), and
- ``process_result`` (reduce).

The engine includes a builtin tqdm progress bar; this can be disabled by setting pyphi.config.PROGRESS_BARS to False.

Parallel operations start a daemon thread which handles log messages sent from worker processes.

Subprocesses spawned by MapReduce cannot spawn more subprocesses; be aware of this when composing nested computations. This is not an issue in practice because it is typically most efficient to only parallelize the top level computation.

description = ''
empty_result(*context)

Return the default result with which to begin the computation.

static compute(obj, *context)

Map over a single object from self.iterable.

process_result(new_result, old_result)

Reduce handler.

Every time a new result is generated by compute, this method is called with the result and the previous (accumulated) result. This method compares or collates these two values, returning the new result.

Setting self.done to True in this method will abort the remainder of the computation, returning this final result.

init_progress_bar()

Initialize and return a progress bar.

static worker(compute, task_queue, result_queue, log_queue, complete, parent_config, *context)

A worker process, run by multiprocessing.Process.

start_parallel()

Initialize all queues and start the worker processes and the log thread.

initialize_tasks()

Load the input queue to capacity.

Overfilling causes a deadlock when queue.put blocks when full, so further tasks are enqueued as results are returned.

maybe_put_task()

Enqueue the next task, if there are any waiting.

run_parallel()

Perform the computation in parallel, reading results from the output queue and passing them to process_result.

finish_parallel()

Orderly shutdown of workers.

run_sequential()

Perform the computation sequentially, only holding two computed objects in memory at a time.

run(parallel=True)

Perform the computation.

Keyword Arguments

parallel (boolean) – If True, run the computation in parallel. Otherwise, operate sequentially.

class pyphi.compute.parallel.LogThread(q)

Thread which handles log records sent from MapReduce processes.

It listens to an instance of multiprocessing.Queue, rewriting log messages to the PyPhi log handler.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

pyphi.compute.parallel.configure_worker_logging(queue)

Configure a worker process to log all messages to queue.