neuraxle.pipeline

Module-level documentation for neuraxle.pipeline. Here is an inheritance diagram, including dependencies to other base modules of Neuraxle:


Neuraxle’s Pipeline Classes

This is the core of Neuraxle’s pipelines. You can chain steps to call them one after an other.

Classes

Barrier([name, savers])

A Barrier step to be used in a minibatch sequential pipeline.

BasePipeline(steps, BaseTransformerT], …)

Pipeline is a list of steps.

Joiner(batch_size, keep_incomplete_batch[, …])

The Joiner step joins the transformed mini batches together with mostly the DACT.minibatches and then DACT.extend method.

MiniBatchSequentialPipeline(steps, …[, …])

Mini Batch Sequential Pipeline class to create a pipeline processing data inputs in batch.

Pipeline(steps, BaseTransformerT], …)

Handle methods are used to handle the fit, transform, fit_transform and inverse_transform methods using a pipe and filter design pattern to handle the steps sequentially.

ZipMinibatchJoiner(batch_size, …[, …])

Zips together minibatch outputs, i.e.

Examples using neuraxle.pipeline.BasePipeline

Examples using neuraxle.pipeline.MiniBatchSequentialPipeline

Examples using neuraxle.pipeline.Pipeline


class neuraxle.pipeline.BasePipeline(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]])[source]

Bases: neuraxle.base.TruncableSteps, abc.ABC

Pipeline is a list of steps. This base class is the base class for all pipelines.

__init__(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]])[source]

Initialize self. See help(type(self)) for accurate signature.

fit(data_inputs, expected_outputs=None) → neuraxle.pipeline.BasePipeline[source]

Fit data inputs on the given expected outputs.

Parameters
  • data_inputs – data inputs

  • expected_outputs – expected outputs to fit on.

Returns

self

transform(data_inputs)[source]

Transform given data inputs.

Parameters

data_inputs – data inputs

Returns

transformed data inputs

fit_transform(data_inputs, expected_outputs=None) → Tuple[neuraxle.pipeline.BasePipeline, Any][source]

Fit, and transform step with the given data inputs, and expected outputs.

Parameters
  • data_inputs – data inputs

  • expected_outputs – expected outputs to fit on

Returns

(fitted self, tranformed data inputs)

_abc_impl = <_abc_data object>
class neuraxle.pipeline.Pipeline(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]])[source]

Bases: neuraxle.pipeline.BasePipeline

Handle methods are used to handle the fit, transform, fit_transform and inverse_transform methods using a pipe and filter design pattern to handle the steps sequentially. Each step is called with the previous step’s output as input. The last step’s output is returned as final output.

Think like the termina pipe commands chained together with the character ‘|’, but where each command is a step, and where each step can acquire some state from fitting the transformed output of the previous step. This way, the steps can be chained together and the state of each step can be saved and reused in the end.

When a pipeline fit, a new pipeline is returned containing the fitted steps within itself.

Also check out the parallelized version of the pipeline, SequentialQueuedPipeline.

__init__(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]])[source]

Initialize self. See help(type(self)) for accurate signature.

fit(data_inputs, expected_outputs=None) → neuraxle.pipeline.Pipeline[source]

After loading the last checkpoint, fit each pipeline steps

Parameters
  • data_inputs – the data input to fit on

  • expected_outputs – the expected data output to fit on

Returns

the pipeline itself

transform(data_inputs: Any)[source]

Transform each pipeline steps with the pipe and filter design pattern.

Parameters

data_inputs – the data input to transform

Returns

transformed data inputs

fit_transform(data_inputs, expected_outputs=None) → Tuple[neuraxle.pipeline.Pipeline, Any][source]

After loading the last checkpoint, fit transform each pipeline steps

Parameters
  • data_inputs – the data input to fit on

  • expected_outputs – the expected data output to fit on

Returns

the pipeline itself

inverse_transform(processed_outputs) → Any[source]

After transforming all data inputs, and obtaining a prediction, we can inverse transform the processed outputs

Parameters

processed_outputs – the forward transformed data input

Returns

backward transformed processed outputs

fit_data_container(data_container) → neuraxle.pipeline.Pipeline[source]
transform_data_container(data_container: neuraxle.data_container.DataContainer)[source]
fit_transform_data_container(data_container) → Tuple[neuraxle.pipeline.Pipeline, neuraxle.data_container.DataContainer][source]
_fit_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.pipeline.Pipeline[source]

After loading the last checkpoint, fit transform each pipeline steps, but only fit the last pipeline step.

Parameters
Returns

tuple(pipeline, data_container)

_transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

After loading the last checkpoint, transform each pipeline steps

Return type

DataContainer

Parameters

data_container (DataContainer) – the data container to transform

Returns

transformed data container

_fit_transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.pipeline.Pipeline, neuraxle.data_container.DataContainer][source]

After loading the last checkpoint, fit transform each pipeline steps

Parameters
Returns

tuple(pipeline, data_container)

_inverse_transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

After transforming all data inputs, and obtaining a prediction, we can inverse transform the processed outputs

_abc_impl = <_abc_data object>
class neuraxle.pipeline.MiniBatchSequentialPipeline(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]], batch_size=None, keep_incomplete_batch: bool = None, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None, mute_joiner_batch_size_warning: bool = True)[source]

Bases: neuraxle.base._CustomHandlerMethods, neuraxle.base.ForceHandleMixin, neuraxle.pipeline.Pipeline

Mini Batch Sequential Pipeline class to create a pipeline processing data inputs in batch.

Provide a default batch size :

data_inputs = list(range(10))
pipeline = MiniBatchSequentialPipeline([
    SomeStep()
], batch_size=2)
pipeline.transform(data_inputs)
# SomeStep will receive: [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

pipeline = MiniBatchSequentialPipeline([
    SomeStep()
], batch_size=3, keep_incomplete_batch=False)
pipeline.transform(data_inputs)
# SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8]]

pipeline = MiniBatchSequentialPipeline(
    [SomeStep()],
    batch_size=3,
    keep_incomplete_batch=True,
    default_value_data_inputs=None,
    default_value_expected_outputs=None
)
pipeline.transform(data_inputs)
# SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, None, None]]

pipeline = MiniBatchSequentialPipeline(
    [SomeStep()],
    batch_size=3,
    keep_incomplete_batch=True,
    default_value_data_inputs=StripAbsentValues()
)
pipeline.transform(data_inputs)
# SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

Or manually add one or multiple :class`Barrier` steps to the mini batch sequential pipeline :

data_inputs = list(range(10))
pipeline = MiniBatchSequentialPipeline([
    SomeStep(),
    Joiner(batch_size=2)
])
pipeline.transform(data_inputs)
# SomeStep will receive: [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

pipeline = MiniBatchSequentialPipeline([
    SomeStep(),
    Joiner(batch_size=3, keep_incomplete_batch=False)
])
pipeline.transform(data_inputs)
# SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8]]

pipeline = MiniBatchSequentialPipeline([
    SomeStep(),
    Joiner(
        batch_size=3,
        keep_incomplete_batch=True,
        default_value_data_inputs=None,
        default_value_expected_outputs=None
    )
])
pipeline.transform(data_inputs)
# SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, None, None]]

pipeline = MiniBatchSequentialPipeline([
    SomeStep(),
    Joiner(
        batch_size=3,
        keep_incomplete_batch=True,
        default_value_data_inputs=StripAbsentValues()
    )
])
pipeline.transform(data_inputs)
# SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

Note that the default value for non-striped ids is None if not stripping incomplete batches of data inputs or expected outputs.

Parameters
  • steps – pipeline steps

  • batch_size – number of elements to combine into a single batch

  • keep_incomplete_batch – (Optional.) A bool representing

whether the last batch should be dropped in the case it has fewer than batch_size elements; the default behavior is not to drop the smaller batch. :param default_value_data_inputs: expected_outputs default fill value for padding and values outside iteration range, or StripAbsentValues to trim absent values from the batch :param default_value_expected_outputs: expected_outputs default fill value for padding and values outside iteration range, or StripAbsentValues to trim absent values from the batch :param cache_folder: cache_folder if its at the root of the pipeline :param mute_joiner_batch_size_warning: If False, will log a warning when automatically setting the joiner batch_size attribute.

__init__(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]], batch_size=None, keep_incomplete_batch: bool = None, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None, mute_joiner_batch_size_warning: bool = True)[source]

Initialize self. See help(type(self)) for accurate signature.

set_batch_size(batch_size)[source]
_validate_barriers_batch_size(batch_size)[source]
_patch_barriers_batch_size(batch_size)[source]
_patch_missing_barrier(batch_size: int, keep_incomplete_batch: bool, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]
joiner
body
transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Transform all sub pipelines splitted by the Barrier steps. :rtype: DataContainer :type context: ExecutionContext :type data_container: DataContainer :param data_container: data container to transform. :param context: execution context :return: data container

fit_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseStep[source]

Fit all sub pipelines splitted by the Barrier steps. :rtype: BaseStep :type context: ExecutionContext :type data_container: DataContainer :param data_container: data container to transform. :param context: execution context :return: data container

fit_transform_data_container(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.base.BaseStep, neuraxle.data_container.DataContainer][source]

Transform all sub pipelines splitted by the Barrier steps. :type context: ExecutionContext :type data_container: DataContainer :param data_container: data container to transform. :param context: execution context :return: data container

_split_on_barriers() → List[neuraxle.pipeline.MiniBatchSequentialPipeline][source]

Create sub pipelines by splitting the steps by the join type name. :return: list of sub pipelines

_abc_impl = <_abc_data object>
class neuraxle.pipeline.Barrier(name=None, savers=None)[source]

Bases: neuraxle.base.Identity, abc.ABC

A Barrier step to be used in a minibatch sequential pipeline. It forces all the data inputs to get to the barrier in a sub pipeline before going through to the next sub-pipeline.

p = MiniBatchSequentialPipeline([
    SomeStep(),
    SomeStep(),
    Barrier(), # must be a concrete Barrier ex: Joiner()
    SomeStep(),
    SomeStep(),
    Barrier(), # must be a concrete Barrier ex: Joiner()
], batch_size=10)
join_transform(step: neuraxle.base.TruncableSteps, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Execute the given pipeline transform() with the given data container, and execution context. :param step: truncable steps to execute :type step: TruncableSteps :param data_container: data container :type data_container: DataContainer :param context: execution context :type context: ExecutionContext :return: transformed data container :rtype: DataContainer

join_fit_transform(step: neuraxle.pipeline.Pipeline, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[Any, neuraxle.data_container.DataContainer][source]

Execute the given pipeline fit_transform() with the given data container, and execution context. :type context: ExecutionContext :type data_container: DataContainer :type step: Pipeline :param step: truncable steps to execute :param data_container: data container :param context: execution context :return: (fitted step, transformed data container)

_abc_impl = <_abc_data object>
class neuraxle.pipeline.Joiner(batch_size: int, keep_incomplete_batch: bool = True, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None)[source]

Bases: neuraxle.pipeline.Barrier

The Joiner step joins the transformed mini batches together with mostly the DACT.minibatches and then DACT.extend method. It is used in a minibatch sequential pipeline and streaming / queued pipeline as a way to handle batches of previous steps in the pipeline.

See also

DataContainer, batch()

__init__(batch_size: int, keep_incomplete_batch: bool = True, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None)[source]

The Joiner step joins the transformed mini batches together with DACT.minibatches and then DACT.extend method. Note that the default value for IDs is None.

join_transform(step: neuraxle.pipeline.Pipeline, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]

Concatenate the pipeline transform output of each batch of self.batch_size together. :type context: ExecutionContext :param step: pipeline to transform on :type step: Pipeline :param data_container: data container to transform :type data_container: DataContainer :param context: execution context :return: transformed data container :rtype: DataContainer

join_fit_transform(step: neuraxle.pipeline.Pipeline, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[Any, neuraxle.data_container.DataContainer][source]

Concatenate the pipeline fit transform output of each batch of self.batch_size together. :type context: ExecutionContext :param step: pipeline to fit transform on :type step: Pipeline :param data_container: data container to fit transform on :type data_container: DataContainer :param context: execution context :return: fitted self, transformed data inputs :rtype: Tuple[Any, DataContainer]

_abc_impl = <_abc_data object>
class neuraxle.pipeline.ZipMinibatchJoiner(batch_size: int, keep_incomplete_batch: bool = True, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None)[source]

Bases: neuraxle.pipeline.Joiner

Zips together minibatch outputs, i.e. returns a DataContainer where the first element is a tuple of every minibatches first element and so on.

join_transform(step: neuraxle.base.TruncableSteps, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.ZipDataContainer[source]

Concatenate the pipeline transform output of each batch of self.batch_size together. :type context: ExecutionContext :param step: pipeline to transform on :type step: Pipeline :param data_container: data container to transform :type data_container: DataContainer :param context: execution context :return: transformed data container :rtype: DataContainer

join_fit_transform(step: neuraxle.pipeline.Pipeline, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[Any, neuraxle.data_container.DataContainer][source]

Concatenate the pipeline fit transform output of each batch of self.batch_size together. :type context: ExecutionContext :param step: pipeline to fit transform on :type step: Pipeline :param data_container: data container to fit transform on :type data_container: DataContainer :param context: execution context :return: fitted self, transformed data inputs :rtype: Tuple[Any, DataContainer]

_abc_impl = <_abc_data object>