neuraxle.distributed.streaming

Streaming Parallel Data Processing

Neuraxle steps for streaming data in parallel in the pipeline

Functions

worker_function(queue_worker, context, …)

Worker function that transforms the items inside the queue of items to process.

Classes

BaseQueuedPipeline(steps, Tuple[int, …[, …])

Sub class of Pipeline.

ObservableQueueMixin(queue)

A class to represent a step that can put items in a queue.

ObservableQueueStepSaver

Saver for observable queue steps.

ParallelQueuedFeatureUnion(steps, Tuple[int, …)

Using QueueWorker, run all steps in parallel using QueueWorkers.

QueueJoiner(batch_size[, n_batches])

Observe the results of the queue worker of type QueueWorker.

QueueWorker(wrapped, max_queue_size, …[, …])

Start multiple Process or Thread that process items from the queue of batches to process.

QueuedPipelineTask(data_container[, step_name])

Data object to contain the tasks processed by the queued pipeline.

SequentialQueuedPipeline(steps, Tuple[int, …)

Using QueueWorker, run all steps sequentially even if they are in separate processes or threads.

class neuraxle.distributed.streaming.BaseQueuedPipeline(steps: List[Union[neuraxle.base.BaseStep, Tuple[int, neuraxle.base.BaseStep], Tuple[str, neuraxle.base.BaseStep], Tuple[str, int, neuraxle.base.BaseStep], Tuple[str, int, int, neuraxle.base.BaseStep], Tuple[str, int, List[Tuple], neuraxle.base.BaseStep]]], batch_size, n_workers_per_step=None, max_queue_size=None, data_joiner=None, use_threading=False, use_savers=False, cache_folder=None)[source]

Sub class of Pipeline. Transform data in many pipeline steps at once in parallel in the pipeline using multiprocessing Queues.

Example usage :

# step name, step

p = QueuedPipeline([
    ('step_a', Identity()),
    ('step_b', Identity()),
], n_workers=1, batch_size=10, max_queue_size=10)

# step name, number of workers, step

p = QueuedPipeline([
    ('step_a', 1, Identity()),
    ('step_b', 1, Identity()),
], batch_size=10, max_queue_size=10)

# step name, number of workers, and max size

p = QueuedPipeline([
    ('step_a', 1, 10, Identity()),
    ('step_b', 1, 10, Identity()),
], batch_size=10)

# step name, number of workers for each step, and additional argument for each worker

p = QueuedPipeline([
    ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity())
], batch_size=10)

# step name, number of workers for each step, additional argument for each worker, and max size

p = QueuedPipeline([
    ('step_a', 1, [('host', 'host1'), ('host', 'host2')], 10, Identity())
], batch_size=10)

See also

QueueWorker, QueueJoiner, CustomPipelineMixin, Pipeline

connect_queued_pipeline()[source]

Connect all the queued workers together so that the data can flow through each step.

Returns

fit_transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) -> ('Pipeline', <class 'neuraxle.data_container.DataContainer'>)[source]

Fit transform sequentially if any step is fittable. Otherwise transform in parallel.

Parameters
Returns

get_n_batches(data_container) → int[source]

Get the total number of batches that the queue joiner is supposed to receive.

Parameters

data_container (DataContainer) – data container to transform

Returns

send_batch_to_queued_pipeline(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]

Send batches to queued pipeline. It is blocking if there is no more space available in the multiprocessing queues. Workers might return batches in a different order, but the queue joiner will reorder them at the end. The queue joiner will use the summary ids to reorder all of the received batches.

Parameters
  • batch_index – batch index

  • data_container – data container batch

Returns

setup() → neuraxle.base.BaseStep[source]

Connect the queued workers together so that the data can correctly flow through the pipeline.

Returns

step

Return type

BaseStep

transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Transform data container

Parameters
Returns

data container

class neuraxle.distributed.streaming.ObservableQueueMixin(queue)[source]

A class to represent a step that can put items in a queue. It can also notify other queues that have subscribed to him using subscribe.

See also

BaseStep, QueuedPipelineTask, QueueWorker, BaseQueuedPipeline, ParallelQueuedPipeline, SequentialQueuedPipeline

get() → neuraxle.distributed.streaming.QueuedPipelineTask[source]

Get last item in queue.

notify(value)[source]

Notify all subscribed queue workers

put(value: neuraxle.data_container.DataContainer)[source]

Put a queued pipeline task in queue.

subscribe(observer_queue_worker: neuraxle.distributed.streaming.ObservableQueueMixin) → neuraxle.distributed.streaming.ObservableQueueMixin[source]

Subscribe a queue worker. The subscribed queue workers get notified when notify() is called.

class neuraxle.distributed.streaming.ObservableQueueStepSaver[source]

Saver for observable queue steps.

can_load(step: neuraxle.base.BaseStep, context: neuraxle.base.ExecutionContext)[source]

Returns true if we can load the given step with the given execution context.

Parameters
  • step – step to load

  • context – execution context to load from

Returns

load_step(step: neuraxle.base.BaseStep, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseStep[source]

Load step with execution context.

Parameters
  • step – step to load

  • context – execution context to load from

Returns

loaded base step

save_step(step: neuraxle.base.BaseStep, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseStep[source]

Save step with execution context.

Parameters
  • step – step to save

  • context – execution context

  • save_savers

Returns

class neuraxle.distributed.streaming.ParallelQueuedFeatureUnion(steps: List[Union[neuraxle.base.BaseStep, Tuple[int, neuraxle.base.BaseStep], Tuple[str, neuraxle.base.BaseStep], Tuple[str, int, neuraxle.base.BaseStep], Tuple[str, int, int, neuraxle.base.BaseStep], Tuple[str, int, List[Tuple], neuraxle.base.BaseStep]]], batch_size, n_workers_per_step=None, max_queue_size=None, data_joiner=None, use_threading=False, use_savers=False, cache_folder=None)[source]

Using QueueWorker, run all steps in parallel using QueueWorkers.

connect_queued_pipeline()[source]

Connect the queue joiner to all of the queued workers to process data in parallel.

Returns

get_n_batches(data_container)[source]

Get the number of batches to process by the queue joiner.

Returns

send_batch_to_queued_pipeline(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]

Send batches to process to all of the queued workers.

Parameters
  • batch_index – batch index

  • data_container – data container batch

Returns

class neuraxle.distributed.streaming.QueueJoiner(batch_size, n_batches=None)[source]

Observe the results of the queue worker of type QueueWorker. Synchronize all of the workers together.

See also

QueuedPipeline, Observer, ListDataContainer, DataContainer

join(original_data_container: neuraxle.data_container.DataContainer) → neuraxle.data_container.DataContainer[source]

Return the accumulated results received by the on next method of this observer.

Returns

transformed data container

Return type

DataContainer

set_n_batches(n_batches)[source]
class neuraxle.distributed.streaming.QueueWorker(wrapped: neuraxle.base.BaseStep, max_queue_size: int, n_workers: int, use_threading: bool, additional_worker_arguments=None, use_savers=False)[source]

Start multiple Process or Thread that process items from the queue of batches to process. It is both an observable, and observer. It notifies the results of the wrapped step handle transform method. It receives the next data container to process.

See also

Observer, Observable, MetaStepMixin, BaseStep

start(context: neuraxle.base.ExecutionContext)[source]

Start multiple processes or threads with the worker function as a target.

Parameters

context (ExecutionContext) – execution context

Returns

stop()[source]

Stop all of the workers.

Returns

class neuraxle.distributed.streaming.QueuedPipelineTask(data_container, step_name=None)[source]

Data object to contain the tasks processed by the queued pipeline.

See also

QueueWorker, BaseQueuedPipeline, ParallelQueuedPipeline, SequentialQueuedPipeline

class neuraxle.distributed.streaming.SequentialQueuedPipeline(steps: List[Union[neuraxle.base.BaseStep, Tuple[int, neuraxle.base.BaseStep], Tuple[str, neuraxle.base.BaseStep], Tuple[str, int, neuraxle.base.BaseStep], Tuple[str, int, int, neuraxle.base.BaseStep], Tuple[str, int, List[Tuple], neuraxle.base.BaseStep]]], batch_size, n_workers_per_step=None, max_queue_size=None, data_joiner=None, use_threading=False, use_savers=False, cache_folder=None)[source]

Using QueueWorker, run all steps sequentially even if they are in separate processes or threads.

See also

QueueWorker, BaseQueuedPipeline, ParallelQueuedPipeline, QueueJoiner, Observer, Observable

connect_queued_pipeline()[source]

Sequentially connect of the queued workers.

Returns

get_n_batches(data_container) → int[source]

Get the number of batches to process.

Parameters

data_container – data container to transform

Returns

number of batches

send_batch_to_queued_pipeline(batch_index: int, data_container: neuraxle.data_container.DataContainer)[source]

Send batches to process to the first queued worker.

Parameters
  • batch_index – batch index

  • data_container – data container batch

Returns

neuraxle.distributed.streaming.worker_function(queue_worker: neuraxle.distributed.streaming.QueueWorker, context: neuraxle.base.ExecutionContext, use_savers: bool, additional_worker_arguments)[source]

Worker function that transforms the items inside the queue of items to process.

Parameters
  • queue_worker – step to transform

  • context – execution context

  • use_savers – use savers

  • additional_worker_arguments – any additional arguments that need to be passed to the workers

Returns