neuraxle.distributed.streaming¶
Module-level documentation for neuraxle.distributed.streaming. Here is an inheritance diagram, including dependencies to other base modules of Neuraxle:
Streaming Pipelines for Parallel and Queued Data Processing¶
Neuraxle steps for streaming data in parallel in the pipeline.
Pipelines can stream data in queues with workers for each steps. Max queue sizes can be set, as well as number of clones per steps for the transformers.
Functions
|
|
|
Worker function that transforms the items inside the queue of items to process. |
Classes
|
Sub class of |
|
Data object to represent an error in a minibatch. |
|
Using |
|
Start multiple Process or Thread that consumes items from the minibatch DACT Queue, and produces them on the next registered consumers’ queue. |
|
Data object to contain the minibatch processed by producers and consumers. |
|
Using |
|
Consume the results of the other |
Examples using neuraxle.distributed.streaming.SequentialQueuedPipeline
¶
-
class
neuraxle.distributed.streaming.
_ProducerConsumerStepSaver
[source]¶ Bases:
neuraxle.base.BaseSaver
Saver for
_ProducerConsumerMixin
. This saver class makes sure that the non-picklable queue is deleted upon saving for multiprocessing steps.-
save_step
(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseTransformer[source]¶ Save a step or a step’s parts using the execution context.
- Return type
- Parameters
step (
BaseTransformer
) – step to savecontext – execution context
save_savers –
- Returns
-
can_load
(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → bool[source]¶ Returns true if we can load the given step with the given execution context.
- Return type
bool
- Parameters
step (
BaseTransformer
) – step to loadcontext – execution context to load from
- Returns
-
load_step
(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseTransformer[source]¶ Load step with execution context.
- Parameters
step – step to load
context – execution context to load from
- Returns
loaded base step
-
_abc_impl
= <_abc_data object>¶
-
-
exception
neuraxle.distributed.streaming.
_QueueDestroyedError
[source]¶ Bases:
EOFError
Error raised when the queue is destroyed.
-
class
neuraxle.distributed.streaming.
_ProducerConsumerMixin
(max_queued_minibatches: int = 0)[source]¶ Bases:
neuraxle.base.MixinForBaseTransformer
A class to represent a step that can receive minibatches from a producer in its queue to consume them. Once minibatches are consumed by the present step, they are produced back to the next consumers in line.
The Queue in self is the one at the entry to be consumed by self. The output queue is external, in the registred consumers.
Therefore, in this sense, consumers and producers are both themselves instances of _HasQueueMixin and play the two roles unless they are the first an lasts of their multiprocessing pipelines. They will thus be used to consume the produced tasks added to them in their other threads or processes.
-
__init__
(max_queued_minibatches: int = 0)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
_setup
(context: Optional[neuraxle.base.ExecutionContext] = None) → Optional[neuraxle.hyperparams.space.RecursiveDict][source]¶
-
register_consumer
(recepient: neuraxle.distributed.streaming._ProducerConsumerMixin) → neuraxle.distributed.streaming._ProducerConsumerMixin[source]¶ Add a consumer to self.consumers so that when self produces a minibatch, it allows consumers to consume it.
-
put_minibatch_produced_to_next_consumers
(task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]¶ Push a minibatch to all subsequent consumers to allow them to consume it. If the task is terminal, close our own queue.
-
put_minibatch_produced
(task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]¶ Put a minibatch in queue. The caller of this method is the producer of the minibatch and is external to self.
-
-
class
neuraxle.distributed.streaming.
QueuedMinibatchTask
(minibatch_dact: neuraxle.data_container.DataContainer, step_name: str = None)[source]¶ Bases:
object
Data object to contain the minibatch processed by producers and consumers.
-
class
neuraxle.distributed.streaming.
MinibatchError
(minibatch_dact: neuraxle.data_container.DataContainer, step_name: str, err: Exception, traceback_msg=None)[source]¶ Bases:
neuraxle.distributed.streaming.QueuedMinibatchTask
Data object to represent an error in a minibatch.
-
neuraxle.distributed.streaming.
worker_function
(worker: neuraxle.distributed.streaming.ParallelWorkersWrapper, context: neuraxle.base.ExecutionContext, use_savers: bool, logging_queue: Optional[multiprocessing.context.BaseContext.Queue])[source]¶ Worker function that transforms the items inside the queue of items to process.
- Parameters
queue_worker – step to transform
context (
ExecutionContext
) – execution contextuse_savers (
bool
) – use savers
- Returns
-
neuraxle.distributed.streaming.
pickle_exception_into_task
(task, err) → neuraxle.distributed.streaming.MinibatchError[source]¶
-
class
neuraxle.distributed.streaming.
ParallelWorkersWrapper
(wrapped: neuraxle.base.BaseTransformer, max_queued_minibatches: int = None, n_workers: int = 1, use_processes: bool = True, use_savers: bool = False)[source]¶ Bases:
neuraxle.distributed.streaming._ProducerConsumerMixin
,neuraxle.base.MetaStep
Start multiple Process or Thread that consumes items from the minibatch DACT Queue, and produces them on the next registered consumers’ queue.
-
__init__
(wrapped: neuraxle.base.BaseTransformer, max_queued_minibatches: int = None, n_workers: int = 1, use_processes: bool = True, use_savers: bool = False)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
_setup
(context: Optional[neuraxle.base.ExecutionContext] = None) → Optional[neuraxle.hyperparams.space.RecursiveDict][source]¶ Internal method to setup the step. May be used by
Pipeline
to setup the pipeline progressively instead of all at once.
-
start
(context: neuraxle.base.ExecutionContext, logging_queue: Optional[multiprocessing.context.BaseContext.Queue] = None)[source]¶ Start multiple processes or threads with the worker function as a target. These workers will consume minibatches from the queue and produce them on the next queue(s). They are started as multiprocessing daemons, so that they will not block the main process if there is an error requiring to exit.
- Parameters
context (
ExecutionContext
) – An execution context that will be checked to be thread_safe.logging_queue – An optional logging_queue from the object
ParallelLoggingConsumerThread
to pass and recover parallelized log records to. Not required for thread-only parallelism, only process-based parallelism.
-
reload_post_saving
(context: neuraxle.base.ExecutionContext) → neuraxle.distributed.streaming.ParallelWorkersWrapper[source]¶
-
_teardown
() → Optional[neuraxle.hyperparams.space.RecursiveDict][source]¶ Stop all processes on teardown.
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.distributed.streaming.
BaseQueuedPipeline
(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queued_minibatches: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]¶ Bases:
neuraxle.pipeline.MiniBatchSequentialPipeline
Sub class of
Pipeline
. Transform data in many pipeline steps at once in parallel in the pipeline using multiprocessing Queues.Example usage :
# Multiple ways of specifying the steps tuples exists to do various things: # step name, step p = SequentialQueuedPipeline([ ('step_a', Identity()), ('step_b', Identity()), ], n_workers=1, batch_size=10, max_queued_minibatches=10) # step name, number of workers, step p = SequentialQueuedPipeline([ ('step_a', 1, Identity()), ('step_b', 1, Identity()), ], batch_size=10, max_queued_minibatches=10) # step name, number of workers, and max size p = SequentialQueuedPipeline([ ('step_a', 1, 10, Identity()), ('step_b', 1, 10, Identity()), ], batch_size=10) # step name, number of workers for each step, and additional argument for each worker p = SequentialQueuedPipeline([ ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity()) ], batch_size=10) # step name, number of workers for each step, additional argument for each worker, and max size p = SequentialQueuedPipeline([ ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity()) ], batch_size=10) # It's also possible to do parallel feature unions: n_workers = 4 worker_arguments = [('hyperparams', HyperparameterSamples({'multiply_by': 2})) for _ in range(n_workers)] p = ParallelQueuedFeatureUnion([ ('1', n_workers, worker_arguments, MultiplyByN()), ], batch_size=10, max_queued_minibatches=5) outputs = p.transform(list(range(100)))
- Parameters
steps – pipeline steps.
batch_size – number of elements to combine into a single batch.
n_workers_per_step – number of workers to spawn per step.
max_queued_minibatches – max number of batches inside the processing queue between the workers.
data_joiner – transformer step to join streamed batches together at the end of the pipeline.
use_processes – use processes instead of threads for parallel processing. multiprocessing.Process is used by default.
use_savers – use savers to serialize steps for parallel processing. Recommended if using processes instead of threads.
keep_incomplete_batch – (Optional.) A bool that indicates whether
or not the last batch should be dropped in the case it has fewer than batch_size elements; the default behavior is to keep the smaller batch. :param default_value_data_inputs: expected_outputs default fill value for padding and values outside iteration range, or
StripAbsentValues
to trim absent values from the batch :param default_value_expected_outputs: expected_outputs default fill value for padding and values outside iteration range, orStripAbsentValues
to trim absent values from the batch-
__init__
(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queued_minibatches: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
_parallel_wrap_all_steps_tuples
(all_step_tuples: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]]) → List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]][source]¶ Wrap each step by a
QueueWorker
to allow data to flow in many pipeline steps at once in parallel.- Parameters
steps (NameNWorkerStepTupleList) – (name, n_workers, step)
- Returns
steps as tuple
-
_parallel_wrap_step_tuple
(step_tuple: Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]])[source]¶
-
_parse_step_tuple
(step_tuple: Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]) → Tuple[str, int, int, List[Tuple], neuraxle.base.BaseTransformer][source]¶ Return all params necessary to create the QueuedPipeline for the given step.
-
_will_process
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.data_container.DataContainer, neuraxle.base.ExecutionContext][source]¶ Setup streaming pipeline before any handler methods.
- Parameters
data_container (
DataContainer
) – data containercontext (
ExecutionContext
) – execution context
- Returns
-
_setup
(context: neuraxle.base.ExecutionContext = None) → neuraxle.base.BaseTransformer[source]¶ Connect the queued workers together so that the data can correctly flow through the pipeline.
- Parameters
context (
ExecutionContext
) – execution context- Returns
step
- Return type
-
fit_transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.pipeline.Pipeline, neuraxle.data_container.DataContainer][source]¶ Fit transform sequentially if any step is fittable, such as with
MiniBatchSequentialPipeline
. Otherwise transform in parallel as it should.- Parameters
data_container (DataContainer) – data container
context (ExecutionContext) – execution context
- Returns
-
transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ Transform data container
- Return type
- Parameters
data_container (DataContainer) – data container to transform.
context (ExecutionContext) – execution context
- Returns
data container
-
_did_transform
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ Stop all of the workers after transform. Also, join the data using self.data_joiner.
- Parameters
data_container (DataContainer) – data container
context (ExecutionContext) – execution context
- Returns
data container
- Return type
-
_did_process
(data_container: neuraxle.data_container.DataContainer[~IDT, ~DIT, typing.Union[~EOT, NoneType]][IDT, DIT, Optional[EOT]], context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[~IDT, ~DIT, typing.Union[~EOT, NoneType]][IDT, DIT, Optional[EOT]][source]¶ Apply side effects after any step method.
- Parameters
data_container – data container
context (
ExecutionContext
) – execution context
- Returns
(data container, execution context)
-
get_n_workers_to_join
() → int[source]¶ Get the total number of terminal steps at the end of each row of queued workers.
-
_connect_queued_pipeline
()[source]¶ Connect all the queued workers together so that the data can flow through each step.
- Returns
-
_dispatch_minibatch_to_consumer_workers
(minibatch_index: int, task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]¶ Send batches to queued pipeline. It is blocking if there is no more space available in the multiprocessing queues. Workers might return batches in a different order, but the queue joiner will reorder them at the end. The queue joiner will use the summary ids to reorder all of the received batches.
- Parameters
batch_index – batch index
data_container – data container batch
- Returns
-
_abc_impl
= <_abc_data object>¶
-
class
neuraxle.distributed.streaming.
SequentialQueuedPipeline
(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queued_minibatches: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]¶ Bases:
neuraxle.distributed.streaming.BaseQueuedPipeline
Using
QueueWorker
, run all steps sequentially even if they are in separate processes or threads. This is a parallel pipeline that uses a queue to communicate between the steps, and which parallelizes the steps using many workers in different processes or threads.This pipeline is useful when the steps are independent of each other and can be run in parallel. This is especially the case when the steps are not fittable, such as inheriting from the
NonFittableMixin
. Otherwise, fitting may not be parallelized, although the steps can be run in parallel for the transformation.See also
BasePipeline
,minibatches()
,StripAbsentValues
,QueueWorker
,BaseQueuedPipeline
,ParallelQueuedPipeline
,-
get_n_workers_to_join
() → int[source]¶ Get the total number of terminal steps at the end of each row of queued workers.
-
_connect_queued_pipeline
()[source]¶ Sequentially connect of the queued workers as producers and consumers.
- Returns
-
_dispatch_minibatch_to_consumer_workers
(minibatch_index: int, task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]¶ Send batches to process to the first queued worker.
- Parameters
batch_index – batch index
data_container – data container batch
- Returns
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.distributed.streaming.
ParallelQueuedFeatureUnion
(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queued_minibatches: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]¶ Bases:
neuraxle.distributed.streaming.BaseQueuedPipeline
Using
QueueWorker
, run all steps in parallel using QueueWorkers.See also
QueueWorker
,BaseQueuedPipeline
,SequentialQueuedPipeline
,-
get_n_workers_to_join
()[source]¶ Get the total number of terminal steps at the end of each row of queued workers.
-
_connect_queued_pipeline
()[source]¶ Connect the queue joiner to all of the queued workers to process data in parallel.
- Returns
-
_dispatch_minibatch_to_consumer_workers
(minibatch_index: int, task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]¶ In the case of the feature union, sending a batch to the workers is done by sending the batch to each of the workers that will work in parallel to consume the same copy sent to all.
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.distributed.streaming.
WorkersJoiner
(batch_size: int, n_worker_wrappers_to_join: int = None)[source]¶ Bases:
neuraxle.distributed.streaming._ProducerConsumerMixin
,neuraxle.pipeline.Joiner
Consume the results of the other
_ProducerConsumerMixin
workers to join their data. Also do error handling.-
__init__
(batch_size: int, n_worker_wrappers_to_join: int = None)[source]¶ The Joiner step joins the transformed mini batches together with DACT.minibatches and then DACT.extend method. Note that the default value for IDs is None.
See also
-
_setup
(context: Optional[neuraxle.base.ExecutionContext] = None) → Optional[neuraxle.hyperparams.space.RecursiveDict][source]¶ Internal method to setup the step. May be used by
Pipeline
to setup the pipeline progressively instead of all at once.
-
_teardown
() → Optional[neuraxle.hyperparams.space.RecursiveDict][source]¶ Properly clean queue, summary ids, and results during teardown.
- Returns
teardowned self
-
append_terminal_summary
(name: str, task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]¶ Append the summary id of the worker to the list of summaries.
- Parameters
name (
str
) – name of the workertask (
QueuedMinibatchTask
) – task
- Returns
-
join_workers
(original_dact: neuraxle.data_container.DataContainer, sync_context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ Return the accumulated results of the workers.
- Returns
transformed data container
- Return type
-
_consume_enqueued_minibatches
(sync_context) → Dict[str, neuraxle.data_container.ListDataContainer[typing.List[str], neuraxle.data_container.ListDataContainer[~IDT, ~DIT, ~EOT], typing.List[NoneType]][List[str], neuraxle.data_container.ListDataContainer[~IDT, ~DIT, ~EOT][IDT, DIT, EOT], List[None]]][source]¶
-
_merge_minibatches
(step_to_minibatches_dacts: Dict[str, neuraxle.data_container.ListDataContainer[typing.List[str], neuraxle.data_container.ListDataContainer[~IDT, ~DIT, ~EOT], typing.List[NoneType]][List[str], neuraxle.data_container.ListDataContainer[~IDT, ~DIT, ~EOT][IDT, DIT, EOT], List[None]]]) → List[neuraxle.data_container.ListDataContainer][source]¶
-
_abc_impl
= <_abc_data object>¶
-