Package mdp :: Package parallel :: Class ParallelFlow
[hide private]
[frames] | no frames]

Class ParallelFlow


A parallel flow provides the methods for parallel training / execution.

Nodes in the flow which are not derived from ParallelNode are trained in
the normal way. The training is also done normally if fork() raises a
TrainingPhaseNotParallelException. This can be intentionally used by the
node to request local training without forking.
Parallel execution on the other hand should work for all nodes, since it
only relies on the copy method of nodes.
The stop_training method is always called locally, with no forking or
copying involved.

Both parallel training and execution can be done conveniently by providing
a scheduler instance to the train or execute method.
It is also possible to manage the tasks manually. This is done via the
methods setup_parallel_training (or execution), get_task and use_results.
The code of the train / execute method can serve as an example how to use
these methods and process the tasks by a scheduler.

Instance Methods [hide private]
 
__init__(self, flow, verbose=False, **kwargs)
Initialize the internal variables.
 
_create_execute_task(self)
Create and return a single execution task.
 
_create_train_task(self)
Create and return a single training task without callable.
 
_local_train_phase(self, data_iterable)
Perform a single training phase locally.
 
_next_train_phase(self)
Find the next phase or node for parallel training.
 
_post_stop_training_hook(self)
Hook method that is called after stop_training is called.
 
execute(self, iterable, nodenr=None, scheduler=None, execute_callable_class=None, overwrite_result_container=True)
Train all trainable nodes in the flow.
 
get_task(self)
Return a task either for either training or execution.
 
setup_parallel_execution(self, iterable, nodenr=None, execute_callable_class=<class 'mdp.parallel.FlowExecuteCallable'>)
Prepare the flow for handing out tasks to do the execution.
 
setup_parallel_training(self, data_iterables, train_callable_class=<class 'mdp.parallel.FlowTrainCallable'>)
Prepare the flow for handing out tasks to do the training.
 
train(self, data_iterables, scheduler=None, train_callable_class=None, overwrite_result_container=True, **kwargs)
Train all trainable nodes in the flow.
 
use_results(self, results)
Use the result from the scheduler.

Inherited from unreachable.newobject: __long__, __native__, __nonzero__, __unicode__, next

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __new__, __reduce__, __reduce_ex__, __setattr__, __sizeof__, __subclasshook__

    Inherited from Flow
 
__add__(self, other)
 
__call__(self, iterable, nodenr=None)
Calling an instance is equivalent to call its 'execute' method.
 
__contains__(self, item)
 
__delitem__(self, key)
 
__getitem__(self, key)
 
__iadd__(self, other)
 
__iter__(self)
 
__len__(self)
 
__repr__(self)
repr(x)
 
__setitem__(self, key, value)
 
__str__(self)
str(x)
 
_check_dimension_consistency(self, out, inp)
Raise ValueError when both dimensions are set and different.
 
_check_nodes_consistency(self, flow=None)
Check the dimension consistency of a list of nodes.
 
_check_value_type_isnode(self, value)
 
_close_last_node(self)
 
_execute_seq(self, x, nodenr=None)
 
_inverse_seq(self, x)
 
_propagate_exception(self, except_, nodenr)
 
_stop_training_hook(self)
Hook method that is called before stop_training is called.
 
_train_check_iterables(self, data_iterables)
Return the data iterables after some checks and sanitizing.
 
_train_node(self, data_iterable, nodenr)
Train a single node in the flow.
 
append(flow, node)
append node to flow end
 
copy(self, protocol=None)
Return a deep copy of the flow.
 
extend(flow, iterable)
extend flow by appending elements from the iterable
 
insert(flow, index, node)
insert node before index
 
inverse(self, iterable)
Process the data through all nodes in the flow backwards (starting from the last node up to the first node) by calling the inverse function of each node. Of course, all nodes in the flow must be invertible.
node
pop(flow, index=...)
remove and return node at index (default last)
 
save(self, filename, protocol=-1)
Save a pickled serialization of the flow to 'filename'. If 'filename' is None, return a string.
 
set_crash_recovery(self, state=True)
Set crash recovery capabilities.
Static Methods [hide private]
    Inherited from Flow
 
_get_required_train_args(node)
Return arguments in addition to self and x for node.train.
Properties [hide private]
  is_parallel_executing
Return True if parallel execution is underway.
  is_parallel_training
Return True if parallel training is underway.
  task_available
Return True if tasks are available, otherwise False.

Inherited from object: __class__

Method Details [hide private]

__init__(self, flow, verbose=False, **kwargs)
(Constructor)

 
Initialize the internal variables.

Note that the crash_recovery flag is is not supported, so it is
disabled.

Overrides: object.__init__

_create_execute_task(self)

 
Create and return a single execution task.

Returns None if data iterator end is reached.

_create_train_task(self)

 
Create and return a single training task without callable.

Returns None if data iterator end is reached.

_local_train_phase(self, data_iterable)

 
Perform a single training phase locally.

The internal _train_callable_class is used for the training.

_next_train_phase(self)

 
Find the next phase or node for parallel training.

When it is found the corresponding internal variables are set.
Nodes which are not derived from ParallelNode are trained locally.
If a fork() fails due to a TrainingPhaseNotParallelException
in a certain train phase, then the training is done locally as well
(but fork() is tested again for the next phase).

_post_stop_training_hook(self)

 
Hook method that is called after stop_training is called.

execute(self, iterable, nodenr=None, scheduler=None, execute_callable_class=None, overwrite_result_container=True)

 
Train all trainable nodes in the flow.

If a scheduler is provided the execution will be done in parallel on
the scheduler.

iterable -- An iterable or iterator that returns data arrays that are
    used as input to the flow. Alternatively, one can specify one
    data array as input.
    If a custom execute_callable_class is used to preprocess the data
    then other data types can be used as well.
nodenr -- Same as in normal flow, the flow is only executed up to the
    nodenr.
scheduler -- Value can be either None for normal execution (default
    value) or a Scheduler instance for parallel execution with the
    scheduler.
execute_callable_class -- Class used to create execution callables for
    the scheduler. By specifying your own class you can implement data
    transformations before the data is actually fed into the flow
    (e.g. from 8 bit image to 64 bit double precision).
    Note that the execute_callable_class is only used if a scheduler was
    provided. If a scheduler is provided the default class used is
    NodeResultContainer.
overwrite_result_container -- If set to True (default value) then
    the result container in the scheduler will be overwritten with an
    instance of OrderedResultContainer (unless it is already an
    instance of OrderedResultContainer). Otherwise the results might
    have a different order than the data chunks, which could mess up
    any subsequent analysis.

Overrides: Flow.execute

get_task(self)

 
Return a task either for either training or execution.

A a one task buffer is used to make task_available work.
tasks are available as long as need_result returns False or all the
training / execution is done. If no tasks are available a
NoTaskException is raised.

setup_parallel_execution(self, iterable, nodenr=None, execute_callable_class=<class 'mdp.parallel.FlowExecuteCallable'>)

 
Prepare the flow for handing out tasks to do the execution.

After calling setup_parallel_execution one has to pick up the
tasks with get_task, run them and finally return the results via
use_results. use_results will then return the result as if the flow was
executed in the normal way.

iterable -- An iterable or iterator that returns data arrays that are
    used as input to the flow. Alternatively, one can specify one
    data array as input.
    If a custom execute_callable_class is used to preprocess the data
    then other data types can be used as well.
nodenr -- Same as in normal flow, the flow is only executed up to the
    nodenr.
execute_callable_class -- Class used to create execution callables for
    the scheduler. By specifying your own class you can implement data
    transformations before the data is actually fed into the flow
    (e.g. from 8 bit image to 64 bit double precision).

setup_parallel_training(self, data_iterables, train_callable_class=<class 'mdp.parallel.FlowTrainCallable'>)

 
Prepare the flow for handing out tasks to do the training.

After calling setup_parallel_training one has to pick up the
tasks with get_task, run them and finally return the results via
use_results. tasks are available as long as task_available returns
True. Training may require multiple phases, which are each closed by
calling use_results.

data_iterables -- A list of iterables, one for each node in the flow.
    The iterators returned by the iterables must
    return data arrays that are then used for the node training.
    See Flow.train for more details.
    If a custom train_callable_class is used to preprocess the data
    then other data types can be used as well.
train_callable_class -- Class used to create training callables for the
    scheduler. By specifying your own class you can implement data
    transformations before the data is actually fed into the flow
    (e.g. from 8 bit image to 64 bit double precision).

train(self, data_iterables, scheduler=None, train_callable_class=None, overwrite_result_container=True, **kwargs)

 
Train all trainable nodes in the flow.

If a scheduler is provided the training will be done in parallel on the
scheduler.

data_iterables -- A list of iterables, one for each node in the flow.
    The iterators returned by the iterables must
    return data arrays that are then used for the node training.
    See Flow.train for more details.
    If a custom train_callable_class is used to preprocess the data
    then other data types can be used as well.
scheduler -- Value can be either None for normal training (default
    value) or a Scheduler instance for parallel training with the
    scheduler.
    If the scheduler value is an iterable or iterator then it is
    assumed that it contains a scheduler for each training phase.
    After a node has been trained the scheduler is shutdown. Note that
    you can e.g. use a generator to create the schedulers just in time.
    For nodes which are not trained the scheduler can be None.
train_callable_class -- Class used to create training callables for the
    scheduler. By specifying your own class you can implement data
    transformations before the data is actually fed into the flow
    (e.g. from 8 bit image to 64 bit double precision).
    Note that the train_callable_class is only used if a scheduler was
    provided. By default NodeResultContainer is used.
overwrite_result_container -- If set to True (default value) then
    the result container in the scheduler will be overwritten with an
    instance of NodeResultContainer (unless it is already an instance
    of NodeResultContainer). This improves the memory efficiency.

Overrides: Flow.train

use_results(self, results)

 
Use the result from the scheduler.

During parallel training this will start the next training phase.
For parallel execution this will return the result, like a normal
execute would.

results -- Iterable containing the results, normally the return value
    of scheduler.ResultContainer.get_results().
    The individual results can be the return values of the tasks.


Property Details [hide private]

is_parallel_executing

Return True if parallel execution is underway.

Get Method:
unreachable.is_parallel_executing(self) - Return True if parallel execution is underway.

is_parallel_training

Return True if parallel training is underway.

Get Method:
unreachable.is_parallel_training(self) - Return True if parallel training is underway.

task_available

Return True if tasks are available, otherwise False.

If False is returned this can indicate that results are needed to
continue training.

Get Method:
unreachable.task_available(self) - Return True if tasks are available, otherwise False.