neuraxle.pipeline¶
Module-level documentation for neuraxle.pipeline. Here is an inheritance diagram, including dependencies to other base modules of Neuraxle:
Neuraxle’s Pipeline Classes¶
This is the core of Neuraxle’s pipelines. You can chain steps to call them one after an other.
Classes
|
A Barrier step to be used in a minibatch sequential pipeline. |
|
Pipeline is a list of steps. |
|
The Joiner step joins the transformed mini batches together with mostly the DACT.minibatches and then DACT.extend method. |
|
Mini Batch Sequential Pipeline class to create a pipeline processing data inputs in batch. |
|
Handle methods are used to handle the fit, transform, fit_transform and inverse_transform methods using a pipe and filter design pattern to handle the steps sequentially. |
|
Zips together minibatch outputs, i.e. |
Examples using neuraxle.pipeline.BasePipeline
¶
Examples using neuraxle.pipeline.MiniBatchSequentialPipeline
¶
Examples using neuraxle.pipeline.Pipeline
¶
-
class
neuraxle.pipeline.
BasePipeline
(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]])[source]¶ Bases:
neuraxle.base.TruncableSteps
,abc.ABC
Pipeline is a list of steps. This base class is the base class for all pipelines.
-
__init__
(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]])[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
fit
(data_inputs, expected_outputs=None) → neuraxle.pipeline.BasePipeline[source]¶ Fit data inputs on the given expected outputs.
- Parameters
data_inputs – data inputs
expected_outputs – expected outputs to fit on.
- Returns
self
-
transform
(data_inputs)[source]¶ Transform given data inputs.
- Parameters
data_inputs – data inputs
- Returns
transformed data inputs
-
fit_transform
(data_inputs, expected_outputs=None) → Tuple[neuraxle.pipeline.BasePipeline, Any][source]¶ Fit, and transform step with the given data inputs, and expected outputs.
- Parameters
data_inputs – data inputs
expected_outputs – expected outputs to fit on
- Returns
(fitted self, tranformed data inputs)
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.pipeline.
Pipeline
(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]])[source]¶ Bases:
neuraxle.pipeline.BasePipeline
Handle methods are used to handle the fit, transform, fit_transform and inverse_transform methods using a pipe and filter design pattern to handle the steps sequentially. Each step is called with the previous step’s output as input. The last step’s output is returned as final output.
Think like the termina pipe commands chained together with the character ‘|’, but where each command is a step, and where each step can acquire some state from fitting the transformed output of the previous step. This way, the steps can be chained together and the state of each step can be saved and reused in the end.
When a pipeline fit, a new pipeline is returned containing the fitted steps within itself.
Also check out the parallelized version of the pipeline,
SequentialQueuedPipeline
.-
__init__
(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]])[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
fit
(data_inputs, expected_outputs=None) → neuraxle.pipeline.Pipeline[source]¶ After loading the last checkpoint, fit each pipeline steps
- Parameters
data_inputs – the data input to fit on
expected_outputs – the expected data output to fit on
- Returns
the pipeline itself
-
transform
(data_inputs: Any)[source]¶ Transform each pipeline steps with the pipe and filter design pattern.
- Parameters
data_inputs – the data input to transform
- Returns
transformed data inputs
-
fit_transform
(data_inputs, expected_outputs=None) → Tuple[neuraxle.pipeline.Pipeline, Any][source]¶ After loading the last checkpoint, fit transform each pipeline steps
- Parameters
data_inputs – the data input to fit on
expected_outputs – the expected data output to fit on
- Returns
the pipeline itself
-
inverse_transform
(processed_outputs) → Any[source]¶ After transforming all data inputs, and obtaining a prediction, we can inverse transform the processed outputs
- Parameters
processed_outputs – the forward transformed data input
- Returns
backward transformed processed outputs
-
fit_transform_data_container
(data_container) → Tuple[neuraxle.pipeline.Pipeline, neuraxle.data_container.DataContainer][source]¶
-
_fit_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.pipeline.Pipeline[source]¶ After loading the last checkpoint, fit transform each pipeline steps, but only fit the last pipeline step.
- Parameters
data_container (
DataContainer
) – the data container to fit transform oncontext (
ExecutionContext
) – execution context
- Returns
tuple(pipeline, data_container)
-
_transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ After loading the last checkpoint, transform each pipeline steps
- Return type
- Parameters
data_container (
DataContainer
) – the data container to transform- Returns
transformed data container
-
_fit_transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.pipeline.Pipeline, neuraxle.data_container.DataContainer][source]¶ After loading the last checkpoint, fit transform each pipeline steps
- Parameters
data_container (
DataContainer
) – the data container to fit transform oncontext (
ExecutionContext
) – execution context
- Returns
tuple(pipeline, data_container)
-
_inverse_transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ After transforming all data inputs, and obtaining a prediction, we can inverse transform the processed outputs
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.pipeline.
MiniBatchSequentialPipeline
(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]], batch_size=None, keep_incomplete_batch: bool = None, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None, mute_joiner_batch_size_warning: bool = True)[source]¶ Bases:
neuraxle.base._CustomHandlerMethods
,neuraxle.base.ForceHandleMixin
,neuraxle.pipeline.Pipeline
Mini Batch Sequential Pipeline class to create a pipeline processing data inputs in batch.
Provide a default batch size :
data_inputs = list(range(10)) pipeline = MiniBatchSequentialPipeline([ SomeStep() ], batch_size=2) pipeline.transform(data_inputs) # SomeStep will receive: [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]] pipeline = MiniBatchSequentialPipeline([ SomeStep() ], batch_size=3, keep_incomplete_batch=False) pipeline.transform(data_inputs) # SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8]] pipeline = MiniBatchSequentialPipeline( [SomeStep()], batch_size=3, keep_incomplete_batch=True, default_value_data_inputs=None, default_value_expected_outputs=None ) pipeline.transform(data_inputs) # SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, None, None]] pipeline = MiniBatchSequentialPipeline( [SomeStep()], batch_size=3, keep_incomplete_batch=True, default_value_data_inputs=StripAbsentValues() ) pipeline.transform(data_inputs) # SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Or manually add one or multiple :class`Barrier` steps to the mini batch sequential pipeline :
data_inputs = list(range(10)) pipeline = MiniBatchSequentialPipeline([ SomeStep(), Joiner(batch_size=2) ]) pipeline.transform(data_inputs) # SomeStep will receive: [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]] pipeline = MiniBatchSequentialPipeline([ SomeStep(), Joiner(batch_size=3, keep_incomplete_batch=False) ]) pipeline.transform(data_inputs) # SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8]] pipeline = MiniBatchSequentialPipeline([ SomeStep(), Joiner( batch_size=3, keep_incomplete_batch=True, default_value_data_inputs=None, default_value_expected_outputs=None ) ]) pipeline.transform(data_inputs) # SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, None, None]] pipeline = MiniBatchSequentialPipeline([ SomeStep(), Joiner( batch_size=3, keep_incomplete_batch=True, default_value_data_inputs=StripAbsentValues() ) ]) pipeline.transform(data_inputs) # SomeStep will receive: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Note that the default value for non-striped ids is None if not stripping incomplete batches of data inputs or expected outputs.
- Parameters
steps – pipeline steps
batch_size – number of elements to combine into a single batch
keep_incomplete_batch – (Optional.) A bool representing
whether the last batch should be dropped in the case it has fewer than batch_size elements; the default behavior is not to drop the smaller batch. :param default_value_data_inputs: expected_outputs default fill value for padding and values outside iteration range, or
StripAbsentValues
to trim absent values from the batch :param default_value_expected_outputs: expected_outputs default fill value for padding and values outside iteration range, orStripAbsentValues
to trim absent values from the batch :param cache_folder: cache_folder if its at the root of the pipeline :param mute_joiner_batch_size_warning: If False, will log a warning when automatically setting the joiner batch_size attribute.See also
DataContainer
,minibatches()
,StripAbsentValues
,Pipeline
,Barrier
,Joiner
,ExecutionContext
,SequentialQueuedPipeline
-
__init__
(steps: List[Union[Tuple[str, BaseTransformerT], BaseTransformerT]], batch_size=None, keep_incomplete_batch: bool = None, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None, mute_joiner_batch_size_warning: bool = True)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
_patch_missing_barrier
(batch_size: int, keep_incomplete_batch: bool, default_value_data_inputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None, default_value_expected_outputs: Union[Any, neuraxle.data_container.StripAbsentValues] = None)[source]¶
-
joiner
¶
-
body
¶
-
transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ Transform all sub pipelines splitted by the Barrier steps. :rtype:
DataContainer
:type context:ExecutionContext
:type data_container:DataContainer
:param data_container: data container to transform. :param context: execution context :return: data container
-
fit_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.base.BaseStep[source]¶ Fit all sub pipelines splitted by the Barrier steps. :rtype:
BaseStep
:type context:ExecutionContext
:type data_container:DataContainer
:param data_container: data container to transform. :param context: execution context :return: data container
-
fit_transform_data_container
(data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[neuraxle.base.BaseStep, neuraxle.data_container.DataContainer][source]¶ Transform all sub pipelines splitted by the Barrier steps. :type context:
ExecutionContext
:type data_container:DataContainer
:param data_container: data container to transform. :param context: execution context :return: data container
-
_split_on_barriers
() → List[neuraxle.pipeline.MiniBatchSequentialPipeline][source]¶ Create sub pipelines by splitting the steps by the join type name. :return: list of sub pipelines
-
_abc_impl
= <_abc_data object>¶
-
class
neuraxle.pipeline.
Barrier
(name=None, savers=None)[source]¶ Bases:
neuraxle.base.Identity
,abc.ABC
A Barrier step to be used in a minibatch sequential pipeline. It forces all the data inputs to get to the barrier in a sub pipeline before going through to the next sub-pipeline.
p = MiniBatchSequentialPipeline([ SomeStep(), SomeStep(), Barrier(), # must be a concrete Barrier ex: Joiner() SomeStep(), SomeStep(), Barrier(), # must be a concrete Barrier ex: Joiner() ], batch_size=10)
See also
-
join_transform
(step: neuraxle.base.TruncableSteps, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ Execute the given pipeline
transform()
with the given data container, and execution context. :param step: truncable steps to execute :type step: TruncableSteps :param data_container: data container :type data_container: DataContainer :param context: execution context :type context: ExecutionContext :return: transformed data container :rtype: DataContainer
-
join_fit_transform
(step: neuraxle.pipeline.Pipeline, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[Any, neuraxle.data_container.DataContainer][source]¶ Execute the given pipeline
fit_transform()
with the given data container, and execution context. :type context:ExecutionContext
:type data_container:DataContainer
:type step:Pipeline
:param step: truncable steps to execute :param data_container: data container :param context: execution context :return: (fitted step, transformed data container)
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.pipeline.
Joiner
(batch_size: int, keep_incomplete_batch: bool = True, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None)[source]¶ Bases:
neuraxle.pipeline.Barrier
The Joiner step joins the transformed mini batches together with mostly the DACT.minibatches and then DACT.extend method. It is used in a minibatch sequential pipeline and streaming / queued pipeline as a way to handle batches of previous steps in the pipeline.
See also
DataContainer
,batch()
-
__init__
(batch_size: int, keep_incomplete_batch: bool = True, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None)[source]¶ The Joiner step joins the transformed mini batches together with DACT.minibatches and then DACT.extend method. Note that the default value for IDs is None.
See also
-
join_transform
(step: neuraxle.pipeline.Pipeline, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.DataContainer[source]¶ Concatenate the pipeline transform output of each batch of self.batch_size together. :type context:
ExecutionContext
:param step: pipeline to transform on :type step: Pipeline :param data_container: data container to transform :type data_container: DataContainer :param context: execution context :return: transformed data container :rtype: DataContainer
-
join_fit_transform
(step: neuraxle.pipeline.Pipeline, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[Any, neuraxle.data_container.DataContainer][source]¶ Concatenate the pipeline fit transform output of each batch of self.batch_size together. :type context:
ExecutionContext
:param step: pipeline to fit transform on :type step: Pipeline :param data_container: data container to fit transform on :type data_container: DataContainer :param context: execution context :return: fitted self, transformed data inputs :rtype: Tuple[Any, DataContainer]
-
_abc_impl
= <_abc_data object>¶
-
-
class
neuraxle.pipeline.
ZipMinibatchJoiner
(batch_size: int, keep_incomplete_batch: bool = True, default_value_data_inputs=<neuraxle.data_container.StripAbsentValues object>, default_value_expected_outputs=None)[source]¶ Bases:
neuraxle.pipeline.Joiner
Zips together minibatch outputs, i.e. returns a DataContainer where the first element is a tuple of every minibatches first element and so on.
-
join_transform
(step: neuraxle.base.TruncableSteps, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → neuraxle.data_container.ZipDataContainer[source]¶ Concatenate the pipeline transform output of each batch of self.batch_size together. :type context:
ExecutionContext
:param step: pipeline to transform on :type step: Pipeline :param data_container: data container to transform :type data_container: DataContainer :param context: execution context :return: transformed data container :rtype: DataContainer
-
join_fit_transform
(step: neuraxle.pipeline.Pipeline, data_container: neuraxle.data_container.DataContainer, context: neuraxle.base.ExecutionContext) → Tuple[Any, neuraxle.data_container.DataContainer][source]¶ Concatenate the pipeline fit transform output of each batch of self.batch_size together. :type context:
ExecutionContext
:param step: pipeline to fit transform on :type step: Pipeline :param data_container: data container to fit transform on :type data_container: DataContainer :param context: execution context :return: fitted self, transformed data inputs :rtype: Tuple[Any, DataContainer]
-
_abc_impl
= <_abc_data object>¶
-