neuraxle.distributed.streaming¶
Module-level documentation for neuraxle.distributed.streaming. Here is an inheritance diagram, including dependencies to other base modules of Neuraxle:

Streaming Pipelines for Parallel and Queued Data Processing¶
Neuraxle steps for streaming data in parallel in the pipeline.
Pipelines can stream data in queues with workers for each steps. Max queue sizes can be set, as well as number of clones per steps for the transformers.
Functions
|
Worker function that transforms the items inside the queue of items to process. |
Classes
|
Sub class of |
|
A class to represent a step that can put items in a queue so that the step can be used to consume its tasks on its own. |
Saver for observable queue steps. |
|
|
Using |
|
Observe the results of the queue worker of type |
|
Start multiple Process or Thread that process items from the queue of batches to process. |
|
Data object to contain the tasks processed by the queued pipeline. |
|
Using |
Examples using neuraxle.distributed.streaming.SequentialQueuedPipeline
¶
-
class
neuraxle.distributed.streaming.
ObservableQueueMixin
(queue: multiprocessing.context.BaseContext.Queue)[source]¶ Bases:
neuraxle.base.MixinForBaseTransformer
A class to represent a step that can put items in a queue so that the step can be used to consume its tasks on its own.
Once tasks are solved, their results are added to the subscribers that were subscribed with the notify call.
A subscriber is itself an ObservableQueueMixin and will thus be used to consume the tasks added to it in probably another thread.
See also
BaseStep
,QueuedPipelineTask
,QueueWorker
,BaseQueuedPipeline
,ParallelQueuedPipeline
,SequentialQueuedPipeline
-
__init__
(queue: multiprocessing.context.BaseContext.Queue)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
subscribe_step
(observer_queue_worker: neuraxle.distributed.streaming.ObservableQueueMixin) → neuraxle.distributed.streaming.ObservableQueueMixin[source]¶ Subscribe a queue worker to self such that we can notify them to post tasks to put on them. The subscribed queue workers get notified when
notify_step()
is called.
-
-
class
neuraxle.distributed.streaming.
QueuedPipelineTask
(data_container, step_name=None)[source]¶ Bases:
object
Data object to contain the tasks processed by the queued pipeline. Attributes: step_name, data_container
See also
QueueWorker
,BaseQueuedPipeline
,ParallelQueuedPipeline
,SequentialQueuedPipeline
-
class
neuraxle.distributed.streaming.
ObservableQueueStepSaver
[source]¶ Bases:
neuraxle.base.BaseSaver
Saver for observable queue steps.
See also
QueueWorker
,neuraxle.base.BaseSaver
,BaseQueuedPipeline
,ParallelQueuedPipeline
,SequentialQueuedPipeline
-
save_step
(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseTransformer[source]¶ Save a step or a step’s parts using the execution context.
- Return type
- Parameters
step (
BaseTransformer
) – step to savecontext – execution context
save_savers –
- Returns
-
can_load
(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → bool[source]¶ Returns true if we can load the given step with the given execution context.
- Return type
bool
- Parameters
step (
BaseTransformer
) – step to loadcontext – execution context to load from
- Returns
-
load_step
(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseTransformer[source]¶ Load step with execution context.
- Parameters
step – step to load
context – execution context to load from
- Returns
loaded base step
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.distributed.streaming.
QueueWorker
(wrapped: neuraxle.base.BaseTransformer, max_queue_size: int, n_workers: int, use_processes: bool = True, additional_worker_arguments: List[T] = None, use_savers: bool = False)[source]¶ Bases:
neuraxle.distributed.streaming.ObservableQueueMixin
,neuraxle.base.MetaStep
Start multiple Process or Thread that process items from the queue of batches to process. It is both an observable, and observer. It notifies the results of the wrapped step handle transform method. It receives the next data container to process.
See also
Observer
,Observable
,MetaStepMixin
,BaseStep
-
__init__
(wrapped: neuraxle.base.BaseTransformer, max_queue_size: int, n_workers: int, use_processes: bool = True, additional_worker_arguments: List[T] = None, use_savers: bool = False)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
start
(context: neuraxle.base.ExecutionContext)[source]¶ Start multiple processes or threads with the worker function as a target.
- Parameters
context (ExecutionContext) – execution context
- Returns
-
_abc_impl
= <_abc_data object>¶
-
-
neuraxle.distributed.streaming.
worker_function
(queue_worker: neuraxle.distributed.streaming.QueueWorker, shared_lock: multiprocessing.context.BaseContext.Lock, context: neuraxle.base.ExecutionContext, use_savers: bool, additional_worker_arguments)[source]¶ Worker function that transforms the items inside the queue of items to process.
- Parameters
queue_worker (
QueueWorker
) – step to transformcontext (
ExecutionContext
) – execution contextuse_savers (
bool
) – use saversadditional_worker_arguments – any additional arguments that need to be passed to the workers
- Returns
-
class
neuraxle.distributed.streaming.
BaseQueuedPipeline
(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queue_size: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None)[source]¶ Bases:
neuraxle.pipeline.MiniBatchSequentialPipeline
Sub class of
Pipeline
. Transform data in many pipeline steps at once in parallel in the pipeline using multiprocessing Queues.Example usage :
# Multiple ways of specifying the steps tuples exists to do various things: # step name, step p = SequentialQueuedPipeline([ ('step_a', Identity()), ('step_b', Identity()), ], n_workers=1, batch_size=10, max_queue_size=10) # step name, number of workers, step p = SequentialQueuedPipeline([ ('step_a', 1, Identity()), ('step_b', 1, Identity()), ], batch_size=10, max_queue_size=10) # step name, number of workers, and max size p = SequentialQueuedPipeline([ ('step_a', 1, 10, Identity()), ('step_b', 1, 10, Identity()), ], batch_size=10) # step name, number of workers for each step, and additional argument for each worker p = SequentialQueuedPipeline([ ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity()) ], batch_size=10) # step name, number of workers for each step, additional argument for each worker, and max size p = SequentialQueuedPipeline([ ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity()) ], batch_size=10) # It's also possible to do parallel feature unions: n_workers = 4 worker_arguments = [('hyperparams', HyperparameterSamples({'multiply_by': 2})) for _ in range(n_workers)] p = ParallelQueuedFeatureUnion([ ('1', n_workers, worker_arguments, MultiplyByN()), ], batch_size=10, max_queue_size=5) outputs = p.transform(list(range(100)))
- Parameters
steps – pipeline steps.
batch_size – number of elements to combine into a single batch.
n_workers_per_step – number of workers to spawn per step.
max_queue_size – max number of batches inside the processing queue between the workers.
data_joiner – transformer step to join streamed batches together at the end of the pipeline.
use_processes – use processes instead of threads for parallel processing. multiprocessing.context.Process is used by default.
use_savers – use savers to serialize steps for parallel processing. Recommended if using processes instead of threads.
keep_incomplete_batch – (Optional.) A bool that indicates whether
or not the last batch should be dropped in the case it has fewer than batch_size elements; the default behavior is to keep the smaller batch. :param default_value_data_inputs: expected_outputs default fill value for padding and values outside iteration range, or
AbsentValuesNullObject
to trim absent values from the batch :param default_value_expected_outputs: expected_outputs default fill value for padding and values outside iteration range, orAbsentValuesNullObject
to trim absent values from the batchSee also
QueueWorker
,QueueJoiner
,CustomPipelineMixin
,Pipeline
-
__init__
(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queue_size: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
_initialize_steps_as_tuple
(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]]) → List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]][source]¶ Wrap each step by a
QueueWorker
to allow data to flow in many pipeline steps at once in parallel.- Parameters
steps (NameNWorkerStepTupleList) – (name, n_workers, step)
- Returns
steps as tuple
-
_create_queue_worker
(step: Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]])[source]¶
-
_get_step_params
(step)[source]¶ Return all params necessary to create the QueuedPipeline for the given step.
-
_will_process
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.data_container.DataContainer, neuraxle.base.ExecutionContext][source]¶ Setup streaming pipeline before any handler methods.
- Parameters
data_container (
DataContainer
) – data containercontext (
ExecutionContext
) – execution context
- Returns
-
_setup
(context: neuraxle.base.ExecutionContext = None) → neuraxle.base.BaseTransformer[source]¶ Connect the queued workers together so that the data can correctly flow through the pipeline.
- Parameters
context (
ExecutionContext
) – execution context- Returns
step
- Return type
-
fit_transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.pipeline.Pipeline, neuraxle.data_container.DataContainer][source]¶ Fit transform sequentially if any step is fittable. Otherwise transform in parallel.
- Parameters
data_container (DataContainer) – data container
context (ExecutionContext) – execution context
- Returns
-
transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ Transform data container
- Return type
- Parameters
data_container (DataContainer) – data container to transform.
context (ExecutionContext) – execution context
- Returns
data container
-
_did_transform
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ Stop all of the workers after transform. Also, join the data using self.data_joiner.
- Parameters
data_container (DataContainer) – data container
context (ExecutionContext) – execution context
- Returns
data container
- Return type
-
get_n_batches
(data_container) → int[source]¶ Get the total number of batches that the queue joiner is supposed to receive.
- Return type
int
- Parameters
data_container (DataContainer) – data container to transform
- Returns
-
connect_queued_pipeline
()[source]¶ Connect all the queued workers together so that the data can flow through each step.
- Returns
-
send_batch_to_queued_pipeline
(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]¶ Send batches to queued pipeline. It is blocking if there is no more space available in the multiprocessing queues. Workers might return batches in a different order, but the queue joiner will reorder them at the end. The queue joiner will use the summary ids to reorder all of the received batches.
- Parameters
batch_index (
int
) – batch indexdata_container (
DataContainer
) – data container batch
- Returns
-
_abc_impl
= <_abc_data object>¶
-
class
neuraxle.distributed.streaming.
SequentialQueuedPipeline
(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queue_size: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None)[source]¶ Bases:
neuraxle.distributed.streaming.BaseQueuedPipeline
Using
QueueWorker
, run all steps sequentially even if they are in separate processes or threads. This is a parallel pipeline that uses a queue to communicate between the steps, and which parallelizes the steps using many workers in different processes or threads.This pipeline is useful when the steps are independent of each other and can be run in parallel. This is especially the case when the steps are not fittable, such as inheriting from the
NonFittableMixin
. Otherwise, fitting may not be parallelized, although the steps can be run in parallel for the transformation.See also
BasePipeline
,minibatches()
,AbsentValuesNullObject
,QueueWorker
,BaseQueuedPipeline
,ParallelQueuedPipeline
,QueueJoiner
,Observer
,Observable
-
get_n_batches
(data_container) → int[source]¶ Get the number of batches to process.
- Return type
int
- Parameters
data_container – data container to transform
- Returns
number of batches
-
send_batch_to_queued_pipeline
(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]¶ Send batches to process to the first queued worker.
- Parameters
batch_index (
int
) – batch indexdata_container (
DataContainer
) – data container batch
- Returns
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.distributed.streaming.
ParallelQueuedFeatureUnion
(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queue_size: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None)[source]¶ Bases:
neuraxle.distributed.streaming.BaseQueuedPipeline
Using
QueueWorker
, run all steps in parallel using QueueWorkers.See also
QueueWorker
,BaseQueuedPipeline
,SequentialQueuedPipeline
,QueueJoiner
,Observer
,Observable
-
get_n_batches
(data_container)[source]¶ Get the number of batches to process by the queue joiner.
- Returns
-
connect_queued_pipeline
()[source]¶ Connect the queue joiner to all of the queued workers to process data in parallel.
- Returns
-
send_batch_to_queued_pipeline
(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]¶ Send batches to process to all of the queued workers.
- Parameters
batch_index (
int
) – batch indexdata_container (
DataContainer
) – data container batch
- Returns
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.distributed.streaming.
QueueJoiner
(batch_size, n_batches=None)[source]¶ Bases:
neuraxle.distributed.streaming.ObservableQueueMixin
,neuraxle.pipeline.Joiner
Observe the results of the queue worker of type
QueueWorker
. Synchronize all of the workers together.See also
QueuedPipeline
,Observer
,ListDataContainer
,DataContainer
-
__init__
(batch_size, n_batches=None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
_teardown
() → neuraxle.base.BaseTransformer[source]¶ Properly clean queue, summary ids, and results during teardown.
- Returns
teardowned self
-
join
(original_data_container: neuraxle.data_container.DataContainer) → neuraxle.data_container.DataContainer[source]¶ Return the accumulated results received by the on next method of this observer.
- Returns
transformed data container
- Return type
-
_raise_exception_throwned_by_workers_if_needed
(data_containers: neuraxle.data_container.ListDataContainer)[source]¶
-
_abc_impl
= <_abc_data object>¶
-