neuraxle.distributed.streaming

Module-level documentation for neuraxle.distributed.streaming. Here is an inheritance diagram, including dependencies to other base modules of Neuraxle:


Streaming Pipelines for Parallel and Queued Data Processing

Neuraxle steps for streaming data in parallel in the pipeline.

Pipelines can stream data in queues with workers for each steps. Max queue sizes can be set, as well as number of clones per steps for the transformers.

Functions

pickle_exception_into_task(task, err)

worker_function(worker, context, use_savers, …)

Worker function that transforms the items inside the queue of items to process.

Classes

BaseQueuedPipeline(steps, Tuple[int, …)

Sub class of Pipeline.

MinibatchError(minibatch_dact, step_name, err)

Data object to represent an error in a minibatch.

ParallelQueuedFeatureUnion(steps, Tuple[int, …)

Using QueueWorker, run all steps in parallel using QueueWorkers.

ParallelWorkersWrapper(wrapped, …)

Start multiple Process or Thread that consumes items from the minibatch DACT Queue, and produces them on the next registered consumers’ queue.

QueuedMinibatchTask(minibatch_dact, step_name)

Data object to contain the minibatch processed by producers and consumers.

SequentialQueuedPipeline(steps, Tuple[int, …)

Using QueueWorker, run all steps sequentially even if they are in separate processes or threads.

WorkersJoiner(batch_size, …)

Consume the results of the other _ProducerConsumerMixin workers to join their data.

Examples using neuraxle.distributed.streaming.SequentialQueuedPipeline


class neuraxle.distributed.streaming._ProducerConsumerStepSaver[source]

Bases: neuraxle.base.BaseSaver

Saver for _ProducerConsumerMixin. This saver class makes sure that the non-picklable queue is deleted upon saving for multiprocessing steps.

save_step(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseTransformer[source]

Save a step or a step’s parts using the execution context.

Return type

BaseTransformer

Parameters
  • step (BaseTransformer) – step to save

  • context – execution context

  • save_savers

Returns

can_load(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → bool[source]

Returns true if we can load the given step with the given execution context.

Return type

bool

Parameters
  • step (BaseTransformer) – step to load

  • context – execution context to load from

Returns

load_step(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseTransformer[source]

Load step with execution context.

Parameters
  • step – step to load

  • context – execution context to load from

Returns

loaded base step

_abc_impl = <_abc_data object>
exception neuraxle.distributed.streaming._QueueDestroyedError[source]

Bases: EOFError

Error raised when the queue is destroyed.

class neuraxle.distributed.streaming._ProducerConsumerMixin(max_queued_minibatches: int = 0)[source]

Bases: neuraxle.base.MixinForBaseTransformer

A class to represent a step that can receive minibatches from a producer in its queue to consume them. Once minibatches are consumed by the present step, they are produced back to the next consumers in line.

The Queue in self is the one at the entry to be consumed by self. The output queue is external, in the registred consumers.

Therefore, in this sense, consumers and producers are both themselves instances of _HasQueueMixin and play the two roles unless they are the first an lasts of their multiprocessing pipelines. They will thus be used to consume the produced tasks added to them in their other threads or processes.

__init__(max_queued_minibatches: int = 0)[source]

Initialize self. See help(type(self)) for accurate signature.

_setup(context: Optional[neuraxle.base.ExecutionContext] = None) → Optional[neuraxle.hyperparams.space.RecursiveDict][source]
_init_queue()[source]
_teardown() → Optional[neuraxle.hyperparams.space.RecursiveDict][source]
register_consumer(recepient: neuraxle.distributed.streaming._ProducerConsumerMixin) → neuraxle.distributed.streaming._ProducerConsumerMixin[source]

Add a consumer to self.consumers so that when self produces a minibatch, it allows consumers to consume it.

put_minibatch_produced_to_next_consumers(task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]

Push a minibatch to all subsequent consumers to allow them to consume it. If the task is terminal, close our own queue.

put_minibatch_produced(task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]

Put a minibatch in queue. The caller of this method is the producer of the minibatch and is external to self.

_ensure_task_picklable(task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]
_get_minibatch_to_consume() → neuraxle.distributed.streaming.QueuedMinibatchTask[source]

Get last minibatch in queue. The caller of this method is probably self, that is why the method is private (starts with an underscore). This method can raise an EOFError.

join()[source]
_allow_exit_without_queue_flush()[source]
class neuraxle.distributed.streaming.QueuedMinibatchTask(minibatch_dact: neuraxle.data_container.DataContainer, step_name: str = None)[source]

Bases: object

Data object to contain the minibatch processed by producers and consumers.

__init__(minibatch_dact: neuraxle.data_container.DataContainer, step_name: str = None)[source]

Initialize self. See help(type(self)) for accurate signature.

is_error() → bool[source]
to_error(error: Exception) → neuraxle.distributed.streaming.MinibatchError[source]
class neuraxle.distributed.streaming.MinibatchError(minibatch_dact: neuraxle.data_container.DataContainer, step_name: str, err: Exception, traceback_msg=None)[source]

Bases: neuraxle.distributed.streaming.QueuedMinibatchTask

Data object to represent an error in a minibatch.

__init__(minibatch_dact: neuraxle.data_container.DataContainer, step_name: str, err: Exception, traceback_msg=None)[source]

Initialize self. See help(type(self)) for accurate signature.

is_error() → bool[source]
get_err() → Exception[source]
neuraxle.distributed.streaming.worker_function(worker: neuraxle.distributed.streaming.ParallelWorkersWrapper, context: neuraxle.base.ExecutionContext, use_savers: bool, logging_queue: Optional[multiprocessing.context.BaseContext.Queue])[source]

Worker function that transforms the items inside the queue of items to process.

Parameters
  • queue_worker – step to transform

  • context (ExecutionContext) – execution context

  • use_savers (bool) – use savers

Returns

neuraxle.distributed.streaming.pickle_exception_into_task(task, err) → neuraxle.distributed.streaming.MinibatchError[source]
class neuraxle.distributed.streaming.ParallelWorkersWrapper(wrapped: neuraxle.base.BaseTransformer, max_queued_minibatches: int = None, n_workers: int = 1, use_processes: bool = True, use_savers: bool = False)[source]

Bases: neuraxle.distributed.streaming._ProducerConsumerMixin, neuraxle.base.MetaStep

Start multiple Process or Thread that consumes items from the minibatch DACT Queue, and produces them on the next registered consumers’ queue.

__init__(wrapped: neuraxle.base.BaseTransformer, max_queued_minibatches: int = None, n_workers: int = 1, use_processes: bool = True, use_savers: bool = False)[source]

Initialize self. See help(type(self)) for accurate signature.

_setup(context: Optional[neuraxle.base.ExecutionContext] = None) → Optional[neuraxle.hyperparams.space.RecursiveDict][source]

Internal method to setup the step. May be used by Pipeline to setup the pipeline progressively instead of all at once.

start(context: neuraxle.base.ExecutionContext, logging_queue: Optional[multiprocessing.context.BaseContext.Queue] = None)[source]

Start multiple processes or threads with the worker function as a target. These workers will consume minibatches from the queue and produce them on the next queue(s). They are started as multiprocessing daemons, so that they will not block the main process if there is an error requiring to exit.

Parameters
  • context (ExecutionContext) – An execution context that will be checked to be thread_safe.

  • logging_queue – An optional logging_queue from the object ParallelLoggingConsumerThread to pass and recover parallelized log records to. Not required for thread-only parallelism, only process-based parallelism.

reload_post_saving(context: neuraxle.base.ExecutionContext) → neuraxle.distributed.streaming.ParallelWorkersWrapper[source]
join()[source]

Wait for workers to finish at least their capture of the logging calls.

_teardown() → Optional[neuraxle.hyperparams.space.RecursiveDict][source]

Stop all processes on teardown.

stop()[source]

Stop all of the workers.

Returns

_abc_impl = <_abc_data object>
class neuraxle.distributed.streaming.BaseQueuedPipeline(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queued_minibatches: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]

Bases: neuraxle.pipeline.MiniBatchSequentialPipeline

Sub class of Pipeline. Transform data in many pipeline steps at once in parallel in the pipeline using multiprocessing Queues.

Example usage :

# Multiple ways of specifying the steps tuples exists to do various things:
# step name, step
p = SequentialQueuedPipeline([
    ('step_a', Identity()),
    ('step_b', Identity()),
], n_workers=1, batch_size=10, max_queued_minibatches=10)

# step name, number of workers, step
p = SequentialQueuedPipeline([
    ('step_a', 1, Identity()),
    ('step_b', 1, Identity()),
], batch_size=10, max_queued_minibatches=10)

# step name, number of workers, and max size
p = SequentialQueuedPipeline([
    ('step_a', 1, 10, Identity()),
    ('step_b', 1, 10, Identity()),
], batch_size=10)

# step name, number of workers for each step, and additional argument for each worker
p = SequentialQueuedPipeline([
    ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity())
], batch_size=10)

# step name, number of workers for each step, additional argument for each worker, and max size
p = SequentialQueuedPipeline([
    ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity())
], batch_size=10)

# It's also possible to do parallel feature unions:
n_workers = 4
worker_arguments = [('hyperparams', HyperparameterSamples({'multiply_by': 2})) for _ in range(n_workers)]
p = ParallelQueuedFeatureUnion([
    ('1', n_workers, worker_arguments, MultiplyByN()),
], batch_size=10, max_queued_minibatches=5)
outputs = p.transform(list(range(100)))
Parameters
  • steps – pipeline steps.

  • batch_size – number of elements to combine into a single batch.

  • n_workers_per_step – number of workers to spawn per step.

  • max_queued_minibatches – max number of batches inside the processing queue between the workers.

  • data_joiner – transformer step to join streamed batches together at the end of the pipeline.

  • use_processes – use processes instead of threads for parallel processing. multiprocessing.Process is used by default.

  • use_savers – use savers to serialize steps for parallel processing. Recommended if using processes instead of threads.

  • keep_incomplete_batch – (Optional.) A bool that indicates whether

or not the last batch should be dropped in the case it has fewer than batch_size elements; the default behavior is to keep the smaller batch. :param default_value_data_inputs: expected_outputs default fill value for padding and values outside iteration range, or StripAbsentValues to trim absent values from the batch :param default_value_expected_outputs: expected_outputs default fill value for padding and values outside iteration range, or StripAbsentValues to trim absent values from the batch

__init__(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queued_minibatches: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

_parallel_wrap_all_steps_tuples(all_step_tuples: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]]) → List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]][source]

Wrap each step by a QueueWorker to allow data to flow in many pipeline steps at once in parallel.

Parameters

steps (NameNWorkerStepTupleList) – (name, n_workers, step)

Returns

steps as tuple

_parallel_wrap_step_tuple(step_tuple: Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]])[source]
_parse_step_tuple(step_tuple: Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]) → Tuple[str, int, int, List[Tuple], neuraxle.base.BaseTransformer][source]

Return all params necessary to create the QueuedPipeline for the given step.

Parameters

step_tuple – the un-parsed tuple of steps

Returns

a tuple of (name, n_workers, max_queued_minibatches, actual_step)

Return type

Tuple[str, int, int, BaseStep]

_will_process(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.data_container.DataContainer, neuraxle.base.ExecutionContext][source]

Setup streaming pipeline before any handler methods.

Parameters
Returns

_setup(context: neuraxle.base.ExecutionContext = None) → neuraxle.base.BaseTransformer[source]

Connect the queued workers together so that the data can correctly flow through the pipeline.

Parameters

context (ExecutionContext) – execution context

Returns

step

Return type

BaseStep

fit_transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.pipeline.Pipeline, neuraxle.data_container.DataContainer][source]

Fit transform sequentially if any step is fittable, such as with MiniBatchSequentialPipeline. Otherwise transform in parallel as it should.

Parameters
Returns

transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Transform data container

Return type

DataContainer

Parameters
Returns

data container

_did_transform(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Stop all of the workers after transform. Also, join the data using self.data_joiner.

Parameters
Returns

data container

Return type

DataContainer

_did_process(data_container: neuraxle.data_container.DataContainer[~IDT, ~DIT, typing.Union[~EOT, NoneType]][IDT, DIT, Optional[EOT]], context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[~IDT, ~DIT, typing.Union[~EOT, NoneType]][IDT, DIT, Optional[EOT]][source]

Apply side effects after any step method.

Parameters
  • data_container – data container

  • context (ExecutionContext) – execution context

Returns

(data container, execution context)

get_n_workers_to_join() → int[source]

Get the total number of terminal steps at the end of each row of queued workers.

_connect_queued_pipeline()[source]

Connect all the queued workers together so that the data can flow through each step.

Returns

_disconnect_queued_pipeline()[source]
_dispatch_minibatch_to_consumer_workers(minibatch_index: int, task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]

Send batches to queued pipeline. It is blocking if there is no more space available in the multiprocessing queues. Workers might return batches in a different order, but the queue joiner will reorder them at the end. The queue joiner will use the summary ids to reorder all of the received batches.

Parameters
  • batch_index – batch index

  • data_container – data container batch

Returns

_abc_impl = <_abc_data object>
class neuraxle.distributed.streaming.SequentialQueuedPipeline(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queued_minibatches: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]

Bases: neuraxle.distributed.streaming.BaseQueuedPipeline

Using QueueWorker, run all steps sequentially even if they are in separate processes or threads. This is a parallel pipeline that uses a queue to communicate between the steps, and which parallelizes the steps using many workers in different processes or threads.

This pipeline is useful when the steps are independent of each other and can be run in parallel. This is especially the case when the steps are not fittable, such as inheriting from the NonFittableMixin. Otherwise, fitting may not be parallelized, although the steps can be run in parallel for the transformation.

See also

BasePipeline, minibatches(), StripAbsentValues, QueueWorker, BaseQueuedPipeline, ParallelQueuedPipeline,

get_n_workers_to_join() → int[source]

Get the total number of terminal steps at the end of each row of queued workers.

_connect_queued_pipeline()[source]

Sequentially connect of the queued workers as producers and consumers.

Returns

_dispatch_minibatch_to_consumer_workers(minibatch_index: int, task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]

Send batches to process to the first queued worker.

Parameters
  • batch_index – batch index

  • data_container – data container batch

Returns

_abc_impl = <_abc_data object>
class neuraxle.distributed.streaming.ParallelQueuedFeatureUnion(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queued_minibatches: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = False, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]

Bases: neuraxle.distributed.streaming.BaseQueuedPipeline

Using QueueWorker, run all steps in parallel using QueueWorkers.

get_n_workers_to_join()[source]

Get the total number of terminal steps at the end of each row of queued workers.

_connect_queued_pipeline()[source]

Connect the queue joiner to all of the queued workers to process data in parallel.

Returns

_dispatch_minibatch_to_consumer_workers(minibatch_index: int, task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]

In the case of the feature union, sending a batch to the workers is done by sending the batch to each of the workers that will work in parallel to consume the same copy sent to all.

_abc_impl = <_abc_data object>
class neuraxle.distributed.streaming.WorkersJoiner(batch_size: int, n_worker_wrappers_to_join: int = None)[source]

Bases: neuraxle.distributed.streaming._ProducerConsumerMixin, neuraxle.pipeline.Joiner

Consume the results of the other _ProducerConsumerMixin workers to join their data. Also do error handling.

__init__(batch_size: int, n_worker_wrappers_to_join: int = None)[source]

The Joiner step joins the transformed mini batches together with DACT.minibatches and then DACT.extend method. Note that the default value for IDs is None.

_setup(context: Optional[neuraxle.base.ExecutionContext] = None) → Optional[neuraxle.hyperparams.space.RecursiveDict][source]

Internal method to setup the step. May be used by Pipeline to setup the pipeline progressively instead of all at once.

_teardown() → Optional[neuraxle.hyperparams.space.RecursiveDict][source]

Properly clean queue, summary ids, and results during teardown.

Returns

teardowned self

set_join_quantities(n_workers: int, n_minibatches_per_worker: int)[source]
append_terminal_summary(name: str, task: neuraxle.distributed.streaming.QueuedMinibatchTask)[source]

Append the summary id of the worker to the list of summaries.

Parameters
Returns

join_workers(original_dact: neuraxle.data_container.DataContainer, sync_context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Return the accumulated results of the workers.

Returns

transformed data container

Return type

DataContainer

_consume_enqueued_minibatches(sync_context) → Dict[str, neuraxle.data_container.ListDataContainer[typing.List[str], neuraxle.data_container.ListDataContainer[~IDT, ~DIT, ~EOT], typing.List[NoneType]][List[str], neuraxle.data_container.ListDataContainer[~IDT, ~DIT, ~EOT][IDT, DIT, EOT], List[None]]][source]
_merge_minibatches(step_to_minibatches_dacts: Dict[str, neuraxle.data_container.ListDataContainer[typing.List[str], neuraxle.data_container.ListDataContainer[~IDT, ~DIT, ~EOT], typing.List[NoneType]][List[str], neuraxle.data_container.ListDataContainer[~IDT, ~DIT, ~EOT][IDT, DIT, EOT], List[None]]]) → List[neuraxle.data_container.ListDataContainer][source]
_abc_impl = <_abc_data object>