Introduction

Over the years working in machine learning projects, we’ve come to discover some software design patterns that simply cannot be avoided in serious machine learning projects.

You’ve probably coded machine learning projects in the past. If so, you probably came across some difficulties working with jupyter notebooks and organizing your code. You’re not alone. Machine learning is new and the software for doing it properly isn’t well established yet. Having completed lots of machine learning projects, we’ve come to discover which design patterns to use.

First, let’s see why it’s important to encapsulate your data transformers and your models properly into objects. Then, let’s see how to chain those steps into a machine learning pipeline to have everything flow correctly and in the good order. Finally, let’s dig into some pipeline functionalities that are surprisingly useful to manage all the things.

The following talk by Guillaume Chevalier covers the content of this page with more depth, as well as other pages of this documentation site.


Encapsulate Models and Data Transformers

You may be tempted to write simple code like this:

import numpy as np

# train
mean = np.expand_dims(np.mean(train_data, axis=-1), axis=-1)
train_data = train_data - mean
std = np.expand_dims(np.std(train_data, axis=-1), axis=-1)
train_data = train_data / std
for epoch in range(10):
    train_data, y_train_expected = shuffle(train_data, y_train_expected)
    model = model.fit(train_data, y_train_expected)

# test
test_data = (test_data - mean) / std
y_predicted_test = model.predict(test_data)

However, it will backfire soon. Prefer the following:

import numpy as np

# train
norm = Normalizer()
train_data = norm.fit_transform(train_data)
for epoch in range(10):
    train_data, y_train_expected = shuffle(train_data, y_train_expected)
    model = model.fit(train_data, y_train_expected)

# test
test_data = norm.predict(test_data)
y_predicted_test = model.predict(test_data)

By having stored your mathematical logic in objects, your code will be much cleaner: you will handle the details of maths within your objects, and the flow of data outside your objects. This is important to reduce mental charge while reading the code, and avoiding bugs.

This is how the Normalizer transformer implementation looks like:

from neuraxle.base import BaseStep
import numpy as np

class Normalizer(BaseStep):
    def fit(data_inputs, expected_outputs):
        self.mean = np.expand_dims(np.mean(train_data, axis=-1), axis=-1)
        self.std = np.expand_dims(np.std(train_data, axis=-1), axis=-1)
        return self

    def transform(data_inputs):
        return (data_inputs - self.mean) / self.std

Note that you can build really powerful steps that can handle the flow of data from one step to another. Steps can: loop, flatten/unflatten, reshape, normalise then unnormalize, choose one of its step, concatenate features together, and much more. Check out the Complete API Documentation to have an overview of all of the steps that are built into Neuraxle. See the Handler Methods page to see how its done.

Pipe and Filter

Let’s revisit the code example just above. You fit a normalizer, and then you transform test data with it. We suggest editing the code furthermore as follow:

import numpy as np
from neuraxle.pipeline import Pipeline
from neuraxle.steps.data import DataShuffler

# train
pipeline = Pipeline([
    Normalizer(),
    DataShuffler(),
    Model()
])

for epoch in range(10):
    pipeline = pipeline.fit(train_data, y_train_expected)

# test
y_predicted_test = pipeline.predict(test_data)

This way, your code will be much more flexible and cleaner. Your main code now just 3 lines of code, yet you can see what it is made of and how the data flows from one class to another by knowing the pipeline object will chain calls from one of the steps it contains to another and so forth.

Features

Let’s add features based on the transformations of different columns. Use the ColumnTransformer step to create features based on the N-dimensional (ND) input data:

from neuraxle.pipeline import Pipeline

pipeline = Pipeline([
    ColumnChooser([
        (range(0, 2), DateToCosineEncoder()),
        (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
    ]),
    Normalizer(),
    DataShuffler(),
    Model()
])

Wrapper (a.k.a. Decorator) classes

If you paid attention enough to the examples above, the last example is flawed. It is missing the fact that the data shuffler should be activated only during train mode. Replace this:

Replace this:

DataShuffler(),

With this:

TrainOnlyWrapper(DataShuffler()),

Pipelines for Minibatching and Parallel Processing

In case your model is a deep learning model, you’ll want mini batches as well. Use a MiniBatchSequentialPipeline like this instead, let’s say with batch_size=128:

from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.steps.column_transformer import ColumnTransformer
from neuraxle.steps.flow import TrainOnlyWrapper
from neuraxle.steps.data import DataShuffler

pipeline = Pipeline([
    ColumnTransformer([
        (range(0, 2), DateToCosineEncoder()),
        (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
    ]),
    Normalizer(),
    TrainOnlyWrapper(DataShuffler()),
    MiniBatchSequentialPipeline([
        Model()
    ], batch_size=128)
])

Note that if you’d like to use queues between each steps in the pipeline to do parallel processing and give a chance to each of your threads to transform some parts of the pipelines for you, especially at test time in prod, you may as well use a SequentialQueuedPipeline with a simple replacement above, instead of a Pipeline.

Repository for lazy data loading

You would like your data to be loaded from a repository. A repository is an object with which you can interact to get the data, and for instance here, get the data IDs before getting the values for those IDs. See how this can be done here:

from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.base import ExecutionContext
from neuraxle.steps.column_transformer import ColumnTransformer
from neuraxle.steps.flow import TrainOnlyWrapper

training_data_ids = training_data_repository.get_all_ids()
context = ExecutionContext('caching_folder').set_service_locator({
    BaseRepository: training_data_repository
})

pipeline = Pipeline([
    ConvertIDsToLoadedData().assert_has_services(BaseRepository),
    ColumnTransformer([
        (range(0, 2), DateToCosineEncoder()),
        (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
    ]),
    Normalizer(),
    TrainOnlyWrapper(DataShuffler()),
    MiniBatchSequentialPipeline([
        Model()
    ], batch_size=128)
]).with_context(context)

You can already see how powerful this can be to efficiently use data sampling techniques (undersampling, oversampling) and to do data augmentation without blowing your RAM usage.

The case above in the example isn’t even ideal: a next step would be to move the normalizer deeper inside the pipeline, so that this way it would be possible to load truly only one minibatch at once. The only side effect will be that the Normalizer will change and fit again throughout the learning. The Normalizer will now change a little bit at each mini batch it sees, the minibatches being loaded one by one and never at once:

from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.steps.column_transformer import ColumnTransformer
from neuraxle.steps.data import DataShuffler
from neuraxle.steps.flow import TrainOnlyWrapper

pipeline = Pipeline([
    TrainOnlyWrapper(DataShuffler()),
    SequentialQueuedPipeline([
        ConvertIDsToLoadedData(),
        ColumnTransformer([
            (range(0, 2), DateToCosineEncoder()),
            (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
        ]),
        Normalizer(),
        Model()
    ], n_workers_per_step=8, max_queue_size=1024, batch_size=128)
])

The importance of using repositories can easily be underlooked. But they are important because not only it will avoid you to blow RAM, it allows you to make abstraction of the way to load the data. You don’t care from which database it comes from as long as you’re being passed a certain instance of your repository that is preconfigured to take the data at the good place, whatever the format of your database is. It is known that data changes a lot over time, and changing your database shouldn’t cause a massive rewriting of your code. Rather, add a new repository implementation, or reconfigure your repository, when the data source changes. Swapping one repository with another one later on will be easy, when your data will be in a different data source in production, will be easy. Robert C. Martin (a.k.a. Uncle Bob) discusses this in depth in his talk on the principles of clean software architecture.

Training your pipeline

To optimize your pipeline, you need to set hyperparams, and hyperparameter spaces . Let’s add a few hyperparams to our Model step:

from neuraxle.pipeline import Pipeline

pipeline = Pipeline([
    ...
    Model().set_hyperparams(HyperparameterSamples({
        'hidden_dim': 12,
        ...
    })).set_hyperparams_space(HyperparameterSpace({
        'hidden_dim': RandInt(6, 750),
        ...
    }))
    ...
])

Note that it would be much cleaner to define those inside the Model step implementation (Guillaume Chevalier likes to have them as a class constant). See the page Hyperparams And Distributions for more information.

The next step is to tune hyperparams. See how you can easily create an AutoML loop that searches for the best hyperparams:

from neuraxle.metaopt.auto_ml import AutoML, HyperparamsJSONRepository, ValidationSplitter
from neuraxle.metaopt.callbacks import ScoringCallback

auto_ml = AutoML(
    pipeline=pipeline
    validation_splitter=ValidationSplitter(test_size=0.20),
    scoring_callback=ScoringCallback(accuracy_score, higher_score_is_better=True),
    n_trials=100,
    epochs=30,
    hyperparams_repository=HyperparamsJSONRepository(cache_folder='cache'),
    refit_trial=True
)
auto_ml.fit(data_inputs, expected_outputs)

See the documentation page Introduction to Automatic Hyperparameter Tuning for more information.

Serializing your pipeline

To deploy your models to production data, you need to serialize them so that they can be loaded, and executed. But you might need to save fitted state from multiple models, or pipeline steps. It can get complicated pretty quickly.

Most steps can be serialized with joblib directly, but there might be times where this is not possible. To solve this problem, use a list of savers for each of your pipeline steps. Each saver can save their own part of the step, and strip it of the things that make it unserializable.

See how you can easily set custom savers for each step :

from neuraxle.pipeline import Pipeline

pipeline = Pipeline([
    ...
    Model().set_savers([
        # save model and strip the non serializable parts
        ModelSaver()
    ])
    ...
])

pipeline.save('cache')

See the documentation page Step Saving And Lifecycle for more information.

Conclusion

Now that is amazing. Your dirty little code was refactored using some design patterns, and now it is so modular that it was easy to parallelize it and to do funky things such as lazy loading the data with mini batches. This way, your machine learning pipeline is also ready for deep learning. Thanks for getting this far.

Putting everything together, you get the following code:

from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.steps.flow import TrainOnlyWrapper
from neuraxle.distributed.streaming import SequentialQueuedPipeline
from neuraxle.metaopt.auto_ml import AutoML, HyperparamsJSONRepository, ValidationSplitter
from neuraxle.metaopt.callbacks import ScoringCallback
from neuraxle.steps.data import DataShuffler
from neuraxle.hyperparams.space import HyperparmeterSamples, HyperparameterSpace
from neuraxle.steps.column_transformer import ColumnTransformer

pipeline = Pipeline([
    TrainOnlyWrapper(DataShuffler()),
    MiniBatchSequentialPipeline([
        ConvertIDsToLoadedData(),
        ColumnChooser([
            (range(0, 2), DateToCosineEncoder()),
            (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
        ]),
        Normalizer(),
        Model().set_savers([ModelSaver()]).set_hyperparams(HyperparameterSamples({
            'hidden_dim': 12,
        })).set_hyperparams_space(HyperparameterSpace({
            'hidden_dim': RandInt(6, 750),
        }))
    ], batch_size=128)
])

auto_ml = AutoML(
    pipeline=pipeline
    validation_splitter=ValidationSplitter(test_size=0.20),
    scoring_callback=ScoringCallback(accuracy_score, higher_score_is_better=True),
    n_trials=100,
    epochs=30,
    hyperparams_repository=HyperparamsJSONRepository(cache_folder='cache'),
    refit_trial=True
)

auto_ml.fit(data_inputs, expected_outputs)