Parallel processing in Neuraxle

This demonstrates how to stream data in parallel in a Neuraxle pipeline.

Out:

SequentialQueuedPipeline
execution time: 0.44899415969848633 seconds
VanillaPipeline
execution time: 2.4164741039276123 seconds

import time
import numpy as np

from neuraxle.distributed.streaming import SequentialQueuedPipeline
from neuraxle.pipeline import Pipeline
from neuraxle.steps.loop import ForEachDataInput
from neuraxle.steps.misc import Sleep
from neuraxle.steps.numpy import MultiplyByN


def main():
    """
    Process tasks of batch size 10 with 8 queued workers that have a max queue size of 10.
    Each task doest the following: For each data input, sleep 0.02 seconds, and multiply by 2.
    """
    sleep_time = 0.02
    p = SequentialQueuedPipeline([
        Pipeline([ForEachDataInput(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]),
    ], n_workers_per_step=8, max_queue_size=10, batch_size=10)

    a = time.time()
    outputs_streaming = p.transform(list(range(100)))
    b = time.time()
    time_queued_pipeline = b - a
    print('SequentialQueuedPipeline')
    print('execution time: {} seconds'.format(time_queued_pipeline))

    """
    Process data inputs sequentially.
    For each data input, sleep 0.02 seconds, and then multiply by 2.
    """
    p = Pipeline([
        Pipeline([ForEachDataInput(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]),
    ])

    a = time.time()
    outputs_vanilla = p.transform(list(range(100)))
    b = time.time()
    time_vanilla_pipeline = b - a

    print('VanillaPipeline')
    print('execution time: {} seconds'.format(time_vanilla_pipeline))

    assert time_queued_pipeline < time_vanilla_pipeline
    assert np.array_equal(outputs_streaming, outputs_vanilla)


if __name__ == '__main__':
    main()

Total running time of the script: ( 0 minutes 2.946 seconds)

Gallery generated by Sphinx-Gallery