Parallelization¶
CodeSnippet
You can download all the code on this page from the code snippets directory
The parallel package adds the ability to parallelize the training
and execution of MPD flows. This package is split into two decoupled parts.
The first part consists of a parallel extension for the familiar MDP
structures of nodes and flows. In principle all MDP nodes aldready
support parallel execution, since copies of a node can be made and used
in parallel. Parallelization of the training on the other hand depends
on the specific node or algorithm. For nodes which can be trained in a
parallelized way there is the extension class ParallelExtensionNode.
It adds the fork and join methods. When providing a parallel
extension for custom node classes you should implement _fork and
_join. Secondly there is the ParallelFlow class, which
internally splits the training or execution into tasks which are then
processed in parallel.
The second part consists of the schedulers. A scheduler takes tasks and processes them in a more or less parallel way (e.g. in multiple Python processes). A scheduler deals with the more technical aspects of the parallelization, but does not need to know anything about nodes and flows.
Basic Examples¶
In the following example we parallelize a simple Flow consisting of
PCA and quadratic SFA, so that it makes use of multiple cores on a modern CPU:
>>> node1 = mdp.nodes.PCANode(input_dim=100, output_dim=10)
>>> node2 = mdp.nodes.SFA2Node(input_dim=10, output_dim=10)
>>> parallel_flow = mdp.parallel.ParallelFlow([node1, node2])
>>> parallel_flow2 = parallel_flow.copy()
>>> parallel_flow3 = parallel_flow.copy()
>>> n_data_chunks = 10
>>> data_iterables = [[np.random.random((50, 100))
... for _ in range(n_data_chunks)]] * 2
>>> scheduler = mdp.parallel.ProcessScheduler()
>>> parallel_flow.train(data_iterables, scheduler=scheduler)
>>> scheduler.shutdown()
Only two additional lines were needed to parallelize the training of the
flow. All one has to do is use a ParallelFlow instead of the normal
Flow and provide a scheduler. The ProcessScheduler will
automatically create as many Python processes as there are CPU cores.
The parallel flow gives the training task for each data chunk over to
the scheduler, which in turn then distributes them across the available
worker processes. The results are then returned to the flow, which puts
them together in the right way. Note that the shutdown method should
be always called at the end to make sure that the recources used by the
scheduler are cleaned up properly. One should therefore put the
shutdown call into a safe try/finally statement
>>> scheduler = mdp.parallel.ProcessScheduler()
>>> try:
... parallel_flow2.train(data_iterables, scheduler=scheduler)
... finally:
... scheduler.shutdown()
The Scheduler class also supports the context manager interface of Python.
One can therefore use a with statement
>>> with mdp.parallel.ProcessScheduler() as scheduler:
... parallel_flow3.train(data_iterables, scheduler=scheduler)
The with statement ensures that scheduler.shutdown is automatically
called (even if there is an exception).
Scheduler¶
The scheduler classes in MDP are derived from the Scheduler base
class (which itself does not implement any parallelization). The
standard choice at the moment is the ProcessScheduler, which
distributes the incoming tasks over multiple Python processes
(circumventing the global interpreter lock or GIL). The performance gain
is highly dependent on the specific situation, but can potentially scale
well with the number of CPU cores (in one real world case we saw a
speed-up factor of 4.2 on an Intel Core i7 processor with 4 physical / 8
logical cores).
MDP has experimental support for the Parallel Python library in the mdp.parallel.pp_support
package. In principle this makes it possible to parallelize across
multiple machines. Recently we also added the thread based scheduler
ThreadScheduler. While it is limited by the GIL it can still
achieve a real-world speedup (since NumPy releases the GIL when
possible) and it causes less overhead compared to the
ProcessScheduler.
(The following information is only releveant for people who want to implement custom scheduler classes.)
The first important method of the scheduler class is add_task. This
method takes two arguments: data and task_callable, which can be
a function or an object with a __call__ method. The return value of
the task_callable is the result of the task. If task_callable is
None then the last provided task_callable will be used. This
splitting into callable and data makes it possible to implement caching
of the task_callable in the scheduler and its workers (caching is
turned on by default in the ProcessScheduler). To further influence
caching one can derive from the TaskCallable class, which has a
fork method to generate new callables in order to preserve the
original cached callable. For MDP training and execution there are
corresponding classes derived from TaskCallable which are
automatically used, so normally there is no need to worry about this.
After submitting all the tasks with add_task you can then call
the get_results method. This method returns all the task results,
normally in a list. If there are open tasks in the scheduler then
get_results will wait until all the tasks are finished (it blocks). You can
also check the status of the scheduler by looking at the
n_open_tasks property, which gives you the number of open tasks.
After using the scheduler you should always call the shutdown method,
otherwise you might get error messages from not properly closed processes.
Internally an instance of the base class mdp.parallel.ResultContainer is
used for the storage of the results in the scheduler. By providing your own
result container to the scheduler you modify the storage. For example the
default result container is an instance of OrderedResultContainer. The
ParallelFlow class by default makes sure that the right container is
used for the task (this can be overriden manually via the
overwrite_result_container parameter of the train and execute
methods).
Parallel Nodes¶
If you want to parallelize your own nodes you have to provide parallel
extensions for them. The ParallelExtensionNode base class has
the new template methods fork and join.
fork should return a new node instance. This new instance can then be
trained somewhere else (e.g. in a different process) with the usual train
method. Afterwards join is called on the original node, with the
forked node as the argument. This should be
equivalent to calling train directly on the original node.
During Execution nodes are not forked by default, instead they are just
copied (for example they are pickled and send to the Python worker
processes). It is possible for nodes during execution to
explicitly request that they are forked and joined (like during
training). This is done by overriding the use_execute_fork method,
which by default returns False. For example nodes that record data
during execution can use this feature to become compatible with
parallelization.
When writing custom parallel node extension you should only overwrite
the _fork and _join methods, which are automatically called by
fork and join. The fork and join take care of the
standard node attributes like the dimensions. You should also look at
the source code of a parallel node like ParallelPCANode to get a
better idea of how to parallelize nodes. By overwriting
use_execute_fork to return True you can force forking and
joining during execution. Note that the same _fork and _join
implementation is called as during training, so if necessary one should
add an node.is_training() check there to determine the correct
action.
Currently we provide the following parallel nodes:
ParallelPCANode, ParallelWhiteningNode, ParallelSFANode,
ParallelSFA2Node, ParallelFDANode, ParallelHistogramNode,
ParallelAdaptiveCutoffNode, ParallelFlowNode, ParallelLayer,
ParallelCloneLayer (the last three are derived from the hinet
package).