Parallelization

CodeSnippet

You can download all the code on this page from the code snippets directory

The parallel package adds the ability to parallelize the training and execution of MPD flows. This package is split into two decoupled parts.

The first part consists of a parallel extension for the familiar MDP structures of nodes and flows. In principle all MDP nodes aldready support parallel execution, since copies of a node can be made and used in parallel. Parallelization of the training on the other hand depends on the specific node or algorithm. For nodes which can be trained in a parallelized way there is the extension class ParallelExtensionNode. It adds the fork and join methods. When providing a parallel extension for custom node classes you should implement _fork and _join. Secondly there is the ParallelFlow class, which internally splits the training or execution into tasks which are then processed in parallel.

The second part consists of the schedulers. A scheduler takes tasks and processes them in a more or less parallel way (e.g. in multiple Python processes). A scheduler deals with the more technical aspects of the parallelization, but does not need to know anything about nodes and flows.

Basic Examples

In the following example we parallelize a simple Flow consisting of PCA and quadratic SFA, so that it makes use of multiple cores on a modern CPU:

>>> node1 = mdp.nodes.PCANode(input_dim=100, output_dim=10)
>>> node2 = mdp.nodes.SFA2Node(input_dim=10, output_dim=10)
>>> parallel_flow = mdp.parallel.ParallelFlow([node1, node2])
>>> parallel_flow2 = parallel_flow.copy()
>>> parallel_flow3 = parallel_flow.copy()
>>> n_data_chunks = 10
>>> data_iterables = [[np.random.random((50, 100))
...                    for _ in range(n_data_chunks)]] * 2
>>> scheduler = mdp.parallel.ProcessScheduler()
>>> parallel_flow.train(data_iterables, scheduler=scheduler)
>>> scheduler.shutdown()

Only two additional lines were needed to parallelize the training of the flow. All one has to do is use a ParallelFlow instead of the normal Flow and provide a scheduler. The ProcessScheduler will automatically create as many Python processes as there are CPU cores. The parallel flow gives the training task for each data chunk over to the scheduler, which in turn then distributes them across the available worker processes. The results are then returned to the flow, which puts them together in the right way. Note that the shutdown method should be always called at the end to make sure that the recources used by the scheduler are cleaned up properly. One should therefore put the shutdown call into a safe try/finally statement

>>> scheduler = mdp.parallel.ProcessScheduler()
>>> try:
...     parallel_flow2.train(data_iterables, scheduler=scheduler)
... finally:
...     scheduler.shutdown()

The Scheduler class also supports the context manager interface of Python. One can therefore use a with statement

>>> with mdp.parallel.ProcessScheduler() as scheduler:
...     parallel_flow3.train(data_iterables, scheduler=scheduler)

The with statement ensures that scheduler.shutdown is automatically called (even if there is an exception).

Scheduler

The scheduler classes in MDP are derived from the Scheduler base class (which itself does not implement any parallelization). The standard choice at the moment is the ProcessScheduler, which distributes the incoming tasks over multiple Python processes (circumventing the global interpreter lock or GIL). The performance gain is highly dependent on the specific situation, but can potentially scale well with the number of CPU cores (in one real world case we saw a speed-up factor of 4.2 on an Intel Core i7 processor with 4 physical / 8 logical cores).

MDP has experimental support for the Parallel Python library in the mdp.parallel.pp_support package. In principle this makes it possible to parallelize across multiple machines. Recently we also added the thread based scheduler ThreadScheduler. While it is limited by the GIL it can still achieve a real-world speedup (since NumPy releases the GIL when possible) and it causes less overhead compared to the ProcessScheduler.

(The following information is only releveant for people who want to implement custom scheduler classes.)

The first important method of the scheduler class is add_task. This method takes two arguments: data and task_callable, which can be a function or an object with a __call__ method. The return value of the task_callable is the result of the task. If task_callable is None then the last provided task_callable will be used. This splitting into callable and data makes it possible to implement caching of the task_callable in the scheduler and its workers (caching is turned on by default in the ProcessScheduler). To further influence caching one can derive from the TaskCallable class, which has a fork method to generate new callables in order to preserve the original cached callable. For MDP training and execution there are corresponding classes derived from TaskCallable which are automatically used, so normally there is no need to worry about this.

After submitting all the tasks with add_task you can then call the get_results method. This method returns all the task results, normally in a list. If there are open tasks in the scheduler then get_results will wait until all the tasks are finished (it blocks). You can also check the status of the scheduler by looking at the n_open_tasks property, which gives you the number of open tasks. After using the scheduler you should always call the shutdown method, otherwise you might get error messages from not properly closed processes.

Internally an instance of the base class mdp.parallel.ResultContainer is used for the storage of the results in the scheduler. By providing your own result container to the scheduler you modify the storage. For example the default result container is an instance of OrderedResultContainer. The ParallelFlow class by default makes sure that the right container is used for the task (this can be overriden manually via the overwrite_result_container parameter of the train and execute methods).

Parallel Nodes

If you want to parallelize your own nodes you have to provide parallel extensions for them. The ParallelExtensionNode base class has the new template methods fork and join. fork should return a new node instance. This new instance can then be trained somewhere else (e.g. in a different process) with the usual train method. Afterwards join is called on the original node, with the forked node as the argument. This should be equivalent to calling train directly on the original node.

During Execution nodes are not forked by default, instead they are just copied (for example they are pickled and send to the Python worker processes). It is possible for nodes during execution to explicitly request that they are forked and joined (like during training). This is done by overriding the use_execute_fork method, which by default returns False. For example nodes that record data during execution can use this feature to become compatible with parallelization.

When writing custom parallel node extension you should only overwrite the _fork and _join methods, which are automatically called by fork and join. The fork and join take care of the standard node attributes like the dimensions. You should also look at the source code of a parallel node like ParallelPCANode to get a better idea of how to parallelize nodes. By overwriting use_execute_fork to return True you can force forking and joining during execution. Note that the same _fork and _join implementation is called as during training, so if necessary one should add an node.is_training() check there to determine the correct action.

Currently we provide the following parallel nodes: ParallelPCANode, ParallelWhiteningNode, ParallelSFANode, ParallelSFA2Node, ParallelFDANode, ParallelHistogramNode, ParallelAdaptiveCutoffNode, ParallelFlowNode, ParallelLayer, ParallelCloneLayer (the last three are derived from the hinet package).