neuraxle.distributed.streaming

Module-level documentation for neuraxle.distributed.streaming. Here is an inheritance diagram, including dependencies to other base modules of Neuraxle:

Inheritance diagram of neuraxle.distributed.streaming

Streaming Pipelines for Parallel and Queued Data Processing

Neuraxle steps for streaming data in parallel in the pipeline.

Pipelines can stream data in queues with workers for each steps. Max queue sizes can be set, as well as number of clones per steps for the transformers.

Functions

worker_function(queue_worker, context, …)

Worker function that transforms the items inside the queue of items to process.

Classes

BaseQueuedPipeline(steps, Tuple[int, …)

Sub class of Pipeline.

ObservableQueueMixin(queue)

A class to represent a step that can put items in a queue so that the step can be used to consume its tasks on its own.

ObservableQueueStepSaver

Saver for observable queue steps.

ParallelQueuedFeatureUnion(steps, Tuple[int, …)

Using QueueWorker, run all steps in parallel using QueueWorkers.

QueueJoiner(batch_size[, n_batches])

Observe the results of the queue worker of type QueueWorker.

QueueWorker(wrapped, max_queue_size, …)

Start multiple Process or Thread that process items from the queue of batches to process.

QueuedPipelineTask(data_container[, step_name])

Data object to contain the tasks processed by the queued pipeline.

SequentialQueuedPipeline(steps, Tuple[int, …)

Using QueueWorker, run all steps sequentially even if they are in separate processes or threads.

Examples using neuraxle.distributed.streaming.SequentialQueuedPipeline


class neuraxle.distributed.streaming.ObservableQueueMixin(queue: multiprocessing.context.BaseContext.Queue)[source]

Bases: neuraxle.base.MixinForBaseTransformer

A class to represent a step that can put items in a queue so that the step can be used to consume its tasks on its own.

Once tasks are solved, their results are added to the subscribers that were subscribed with the notify call.

A subscriber is itself an ObservableQueueMixin and will thus be used to consume the tasks added to it in probably another thread.

See also

BaseStep, QueuedPipelineTask, QueueWorker, BaseQueuedPipeline, ParallelQueuedPipeline, SequentialQueuedPipeline

__init__(queue: multiprocessing.context.BaseContext.Queue)[source]

Initialize self. See help(type(self)) for accurate signature.

_teardown()[source]
_add_observable_queue_step_saver()[source]
subscribe_step(observer_queue_worker: neuraxle.distributed.streaming.ObservableQueueMixin) → neuraxle.distributed.streaming.ObservableQueueMixin[source]

Subscribe a queue worker to self such that we can notify them to post tasks to put on them. The subscribed queue workers get notified when notify() is called.

get_task() → neuraxle.distributed.streaming.QueuedPipelineTask[source]

Get last item in queue.

put_task(value: neuraxle.data_container.DataContainer)[source]

Put a queued pipeline task in queue.

notify_step(value: neuraxle.data_container.DataContainer)[source]

Notify all subscribed queue workers to put them some tasks on their queue.

class neuraxle.distributed.streaming.QueuedPipelineTask(data_container, step_name=None)[source]

Bases: object

Data object to contain the tasks processed by the queued pipeline. Attributes: step_name, data_container

See also

QueueWorker, BaseQueuedPipeline, ParallelQueuedPipeline, SequentialQueuedPipeline

__init__(data_container, step_name=None)[source]

Initialize self. See help(type(self)) for accurate signature.

class neuraxle.distributed.streaming.ObservableQueueStepSaver[source]

Bases: neuraxle.base.BaseSaver

Saver for observable queue steps.

save_step(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseTransformer[source]

Save step with execution context.

Return type

BaseTransformer

Parameters
  • step (BaseTransformer) – step to save

  • context – execution context

  • save_savers

Returns

can_load(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → bool[source]

Returns true if we can load the given step with the given execution context.

Return type

bool

Parameters
  • step (BaseTransformer) – step to load

  • context – execution context to load from

Returns

load_step(step: neuraxle.base.BaseTransformer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseTransformer[source]

Load step with execution context.

Parameters
  • step – step to load

  • context – execution context to load from

Returns

loaded base step

_abc_impl = <_abc_data object>
class neuraxle.distributed.streaming.QueueWorker(wrapped: neuraxle.base.BaseTransformer, max_queue_size: int, n_workers: int, use_processes: bool = True, additional_worker_arguments: List[T] = None, use_savers: bool = False)[source]

Bases: neuraxle.distributed.streaming.ObservableQueueMixin, neuraxle.base.MetaStep

Start multiple Process or Thread that process items from the queue of batches to process. It is both an observable, and observer. It notifies the results of the wrapped step handle transform method. It receives the next data container to process.

See also

Observer, Observable, MetaStepMixin, BaseStep

__init__(wrapped: neuraxle.base.BaseTransformer, max_queue_size: int, n_workers: int, use_processes: bool = True, additional_worker_arguments: List[T] = None, use_savers: bool = False)[source]

Initialize self. See help(type(self)) for accurate signature.

start(context: neuraxle.base.ExecutionContext)[source]

Start multiple processes or threads with the worker function as a target.

Parameters

context (ExecutionContext) – execution context

Returns

_teardown()[source]

Stop all processes on teardown.

Returns

teardowned self

stop()[source]

Stop all of the workers.

Returns

_abc_impl = <_abc_data object>
neuraxle.distributed.streaming.worker_function(queue_worker: neuraxle.distributed.streaming.QueueWorker, context: neuraxle.base.ExecutionContext, use_savers: bool, additional_worker_arguments)[source]

Worker function that transforms the items inside the queue of items to process.

Parameters
  • queue_worker (QueueWorker) – step to transform

  • context (ExecutionContext) – execution context

  • use_savers (bool) – use savers

  • additional_worker_arguments – any additional arguments that need to be passed to the workers

Returns

class neuraxle.distributed.streaming.BaseQueuedPipeline(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queue_size: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = True, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, cache_folder: str = None)[source]

Bases: neuraxle.pipeline.MiniBatchSequentialPipeline

Sub class of Pipeline. Transform data in many pipeline steps at once in parallel in the pipeline using multiprocessing Queues.

Example usage :

# Multiple ways of specifying the steps tuples exists to do various things:
# step name, step
p = SequentialQueuedPipeline([
    ('step_a', Identity()),
    ('step_b', Identity()),
], n_workers=1, batch_size=10, max_queue_size=10)

# step name, number of workers, step
p = SequentialQueuedPipeline([
    ('step_a', 1, Identity()),
    ('step_b', 1, Identity()),
], batch_size=10, max_queue_size=10)

# step name, number of workers, and max size
p = SequentialQueuedPipeline([
    ('step_a', 1, 10, Identity()),
    ('step_b', 1, 10, Identity()),
], batch_size=10)

# step name, number of workers for each step, and additional argument for each worker
p = SequentialQueuedPipeline([
    ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity())
], batch_size=10)

# step name, number of workers for each step, additional argument for each worker, and max size
p = SequentialQueuedPipeline([
    ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity())
], batch_size=10)

# It's also possible to do parallel feature unions:
n_workers = 4
worker_arguments = [('hyperparams', HyperparameterSamples({'multiply_by': 2})) for _ in range(n_workers)]
p = ParallelQueuedFeatureUnion([
    ('1', n_workers, worker_arguments, MultiplyByN()),
], batch_size=10, max_queue_size=5)
outputs = p.transform(list(range(100)))
Parameters
  • steps – pipeline steps.

  • batch_size – number of elements to combine into a single batch.

  • n_workers_per_step – number of workers to spawn per step.

  • max_queue_size – max number of batches inside the processing queue between the workers.

  • data_joiner – transformer step to join streamed batches together at the end of the pipeline.

  • use_processes – use processes instead of threads for parallel processing. multiprocessing.context.Process is used by default.

  • use_savers – use savers to serialize steps for parallel processing. Recommended if using processes instead of threads.

  • keep_incomplete_batch – (Optional.) A bool that indicates whether

or not the last batch should be dropped in the case it has fewer than batch_size elements; the default behavior is to keep the smaller batch. :param default_value_data_inputs: expected_outputs default fill value for padding and values outside iteration range, or AbsentValuesNullObject to trim absent values from the batch :param default_value_expected_outputs: expected_outputs default fill value for padding and values outside iteration range, or AbsentValuesNullObject to trim absent values from the batch :param cache_folder: cache_folder if its at the root of the pipeline

See also

QueueWorker, QueueJoiner, CustomPipelineMixin, Pipeline

__init__(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queue_size: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = True, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, cache_folder: str = None)[source]

Initialize self. See help(type(self)) for accurate signature.

_initialize_steps_as_tuple(steps)[source]

Wrap each step by a QueueWorker to allow data to flow in many pipeline steps at once in parallel.

Parameters

steps (NameNWorkerStepTupleList) – (name, n_workers, step)

Returns

steps as tuple

Return type

NamedTupleList

_create_queue_worker(step: Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]])[source]
_get_step_params(step)[source]

Return all params necessary to create the QueuedPipeline for the given step.

Parameters

step (QueuedPipelineStepsTupleList) – tuple

Returns

return name, n_workers, max_queue_size, actual_step

Return type

tuple(str, int, int, BaseStep)

_will_process(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) -> (<class 'neuraxle.data_container.DataContainer'>, <class 'neuraxle.base.ExecutionContext'>)[source]

Setup streaming pipeline before any handler methods.

Parameters
Returns

setup(context: neuraxle.base.ExecutionContext = None) → neuraxle.base.BaseTransformer[source]

Connect the queued workers together so that the data can correctly flow through the pipeline.

Parameters

context (ExecutionContext) – execution context

Returns

step

Return type

BaseStep

fit_transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) -> (<class 'neuraxle.pipeline.Pipeline'>, <class 'neuraxle.data_container.DataContainer'>)[source]

Fit transform sequentially if any step is fittable. Otherwise transform in parallel.

Parameters
Returns

transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Transform data container

Return type

DataContainer

Parameters
Returns

data container

_did_transform(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Stop all of the workers after transform. Also, join the data using self.data_joiner.

Parameters
Returns

data container

Return type

DataContainer

get_n_batches(data_container) → int[source]

Get the total number of batches that the queue joiner is supposed to receive.

Return type

int

Parameters

data_container (DataContainer) – data container to transform

Returns

connect_queued_pipeline()[source]

Connect all the queued workers together so that the data can flow through each step.

Returns

send_batch_to_queued_pipeline(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]

Send batches to queued pipeline. It is blocking if there is no more space available in the multiprocessing queues. Workers might return batches in a different order, but the queue joiner will reorder them at the end. The queue joiner will use the summary ids to reorder all of the received batches.

Parameters
  • batch_index (int) – batch index

  • data_container (DataContainer) – data container batch

Returns

_abc_impl = <_abc_data object>
class neuraxle.distributed.streaming.SequentialQueuedPipeline(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queue_size: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = True, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, cache_folder: str = None)[source]

Bases: neuraxle.distributed.streaming.BaseQueuedPipeline

Using QueueWorker, run all steps sequentially even if they are in separate processes or threads.

See also

minibatches(), AbsentValuesNullObject, QueueWorker, BaseQueuedPipeline, ParallelQueuedPipeline, QueueJoiner, Observer, Observable

get_n_batches(data_container) → int[source]

Get the number of batches to process.

Return type

int

Parameters

data_container – data container to transform

Returns

number of batches

connect_queued_pipeline()[source]

Sequentially connect of the queued workers.

Returns

send_batch_to_queued_pipeline(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]

Send batches to process to the first queued worker.

Parameters
  • batch_index (int) – batch index

  • data_container (DataContainer) – data container batch

Returns

_abc_impl = <_abc_data object>
class neuraxle.distributed.streaming.ParallelQueuedFeatureUnion(steps: List[Union[neuraxle.base.BaseTransformer, Tuple[int, neuraxle.base.BaseTransformer], Tuple[str, neuraxle.base.BaseTransformer], Tuple[str, int, neuraxle.base.BaseTransformer], Tuple[str, int, int, neuraxle.base.BaseTransformer], Tuple[str, int, List[Tuple], neuraxle.base.BaseTransformer]]], batch_size: int, n_workers_per_step: int = None, max_queue_size: int = None, data_joiner: neuraxle.base.BaseTransformer = None, use_processes: bool = True, use_savers: bool = False, keep_incomplete_batch: bool = True, default_value_data_inputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.AbsentValuesNullObject] = None, cache_folder: str = None)[source]

Bases: neuraxle.distributed.streaming.BaseQueuedPipeline

Using QueueWorker, run all steps in parallel using QueueWorkers.

get_n_batches(data_container)[source]

Get the number of batches to process by the queue joiner.

Returns

connect_queued_pipeline()[source]

Connect the queue joiner to all of the queued workers to process data in parallel.

Returns

send_batch_to_queued_pipeline(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]

Send batches to process to all of the queued workers.

Parameters
  • batch_index (int) – batch index

  • data_container (DataContainer) – data container batch

Returns

_abc_impl = <_abc_data object>
class neuraxle.distributed.streaming.QueueJoiner(batch_size, n_batches=None)[source]

Bases: neuraxle.distributed.streaming.ObservableQueueMixin, neuraxle.pipeline.Joiner

Observe the results of the queue worker of type QueueWorker. Synchronize all of the workers together.

See also

QueuedPipeline, Observer, ListDataContainer, DataContainer

__init__(batch_size, n_batches=None)[source]

Initialize self. See help(type(self)) for accurate signature.

_teardown() → neuraxle.base.BaseTransformer[source]

Properly clean queue, summary ids, and results during teardown.

Returns

teardowned self

set_n_batches(n_batches)[source]
join(original_data_container: neuraxle.data_container.DataContainer) → neuraxle.data_container.DataContainer[source]

Return the accumulated results received by the on next method of this observer.

Returns

transformed data container

Return type

DataContainer

_join_all_step_results()[source]

Concatenate all resulting data containers together.

Returns

_raise_exception_throwned_by_workers_if_needed(data_containers)[source]
_abc_impl = <_abc_data object>
_join_step_results(data_containers)[source]