Creating a pipeline

This section covers how to create a pipeline from a set of nodes, which are Python functions, as described in more detail in the nodes and pipelines user guide documentation.

  1. As you draft experimental code, you can use a Jupyter Notebook or IPython session. If you include docstrings to explain what your functions do, you can take advantage of auto-generated Sphinx documentation later on. Once you are happy with how you have written your node functions, you will run kedro jupyter convert --all (or kedro jupyter convert <filepath_to_my_notebook>) to export the code cells tagged as node into the src/kedro_tutorial/nodes/ folder as a .py file.
  2. When you are ready with a node you should add it to the pipeline in src/kedro_tutorial/pipeline.py, specifying its inputs and outputs.

Node basics

You previously registered the raw datasets for your Kedro project, so you can now start processing the data and preparing it for model building. Let’s pre-process two of the datasets (companies.csv and shuttles.xlsx) by creating Python functions for each.

Create a file called data_engineering.py inside your nodes folder and add the following functions:

import pandas as pd


def _is_true(x):
    return x == "t"


def _parse_percentage(x):
    if isinstance(x, str):
        return float(x.replace("%", "")) / 100
    return float("NaN")


def _parse_money(x):
    return float(x.replace("$", "").replace(",", ""))


def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocess the data for companies.

        Args:
            companies: Source data.
        Returns:
            Preprocessed data.

    """

    companies["iata_approved"] = companies["iata_approved"].apply(_is_true)

    companies["company_rating"] = companies["company_rating"].apply(_parse_percentage)

    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocess the data for shuttles.

        Args:
            shuttles: Source data.
        Returns:
            Preprocessed data.

    """
    shuttles["d_check_complete"] = shuttles["d_check_complete"].apply(_is_true)

    shuttles["moon_clearance_complete"] = shuttles["moon_clearance_complete"].apply(
        _is_true
    )

    shuttles["price"] = shuttles["price"].apply(_parse_money)

    return shuttles

Assemble nodes into a pipeline

Now you have functions which take one dataframe and output a pre-processed version of that dataframe. Next you should add these functions as nodes into the pipeline in pipeline.py, so the create_pipeline() function looks as follows:

def create_pipeline(**kwargs):
    """Create the project's pipeline.

    Args:
        kwargs: Ignore any additional arguments added in the future.

    Returns:
        Pipeline: The resulting pipeline.

    """
    pipeline = Pipeline(
        [
            node(preprocess_companies, "companies", "preprocessed_companies", name="preprocess1"),
            node(preprocess_shuttles, "shuttles", "preprocessed_shuttles", name="preprocess2"),
        ]
    )

    return pipeline

You will also need to import node, and your functions by adding them to the beginning of the pipeline.py file:

from kedro.pipeline import node, Pipeline
from kedro_tutorial.nodes.data_engineering import (
    preprocess_companies,
    preprocess_shuttles,
)

As you develop your nodes, you can test too see if they work as expected. As an example, run the following command in your terminal window:

kedro run --node=preprocess1

You should see output similar to the below:

2019-08-19 10:44:33,112 - root - INFO - ** Kedro project kedro-tutorial
2019-08-19 10:44:33,123 - kedro.io.data_catalog - INFO - Loading data from `companies` (CSVLocalDataSet)...
2019-08-19 10:44:33,161 - kedro.pipeline.node - INFO - Running node: preprocess1: preprocess_companies([companies]) -> [preprocessed_companies]
2019-08-19 10:44:33,206 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_companies` (CSVLocalDataSet)...
2019-08-19 10:44:33,471 - kedro.runner.sequential_runner - INFO - Completed 1 out of 1 tasks
2019-08-19 10:44:33,471 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

Now check if the entire pipeline is running without any errors by typing this in your terminal window:

kedro run

You should see output similar to the following

kedro run

2019-08-19 10:50:39,950 - root - INFO - ** Kedro project kedro-tutorial
2019-08-19 10:50:39,957 - kedro.io.data_catalog - INFO - Loading data from `shuttles` (ExcelLocalDataSet)...
2019-08-19 10:50:48,521 - kedro.pipeline.node - INFO - Running node: preprocess2: preprocess_shuttles([shuttles]) -> [preprocessed_shuttles]
2019-08-19 10:50:48,587 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_shuttles` (CSVLocalDataSet)...
2019-08-19 10:50:49,133 - kedro.runner.sequential_runner - INFO - Completed 1 out of 2 tasks
2019-08-19 10:50:49,133 - kedro.io.data_catalog - INFO - Loading data from `companies` (CSVLocalDataSet)...
2019-08-19 10:50:49,168 - kedro.pipeline.node - INFO - Running node: preprocess1: preprocess_companies([companies]) -> [preprocessed_companies]
2019-08-19 10:50:49,212 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_companies` (CSVLocalDataSet)...
2019-08-19 10:50:49,458 - kedro.runner.sequential_runner - INFO - Completed 2 out of 2 tasks
2019-08-19 10:50:49,459 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

Persisting pre-processed data

Now each of our 2 newly added data preprocessing nodes outputs a new dataset: preprocessed_companies and preprocessed_shuttles respectively. Node inputs and outputs are used by the pipeline to determine interdependencies between the nodes, and hence, their execution order.

When Kedro ran the pipeline, it determined that those datasets were not registered in the data catalog (conf/base/catalog.yml). If a dataset is not registered, Kedro stores it in memory as a Python object using the MemoryDataSet class. Once all nodes depending on it have been executed, a MemoryDataSet is cleared and its memory released by the Python garbage collector.

If you prefer, you can persist any preprocessed data by adding the following to the conf/base/catalog.yml file:

preprocessed_companies:
  type: CSVLocalDataSet
  filepath: data/02_intermediate/preprocessed_companies.csv

preprocessed_shuttles:
  type: CSVLocalDataSet
  filepath: data/02_intermediate/preprocessed_shuttles.csv

By doing so you explicitly declare that CSVLocalDataSet should be used instead of MemoryDataSet. CSVLocalDataSet will save the data as a CSV file to the local filepath specified. There is no need to change any code in your preprocessing functions to accommodate this change. DataCatalog will take care of saving those datasets automatically the next time you run the pipeline:

kedro run

CSVLocalDataSet is chosen for its simplicity, but you can choose any other available dataset implementation class to save the data, for example, to a database table, cloud storage (like AWS S3, Azure Blob Storage, etc.) and others. If you cannot find the dataset implementation you need, you can easily implement your own as you already did earlier and share it with the world by contributing back to Kedro!

Creating a master table

We need to add a function to join together the three dataframes into a single master table in a cell in the notebook as follows:

import pandas as pd

def create_master_table(
    shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
    """Combines all data to create a master table.

        Args:
            shuttles: Preprocessed data for shuttles.
            companies: Preprocessed data for companies.
            reviews: Source data for reviews.
        Returns:
            Master table.

    """
    rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")

    with_companies = rated_shuttles.merge(
        companies, left_on="company_id", right_on="id"
    )

    master_table = with_companies.drop(["shuttle_id", "company_id"], axis=1)
    master_table = master_table.dropna()
    return master_table

Working in a Jupyter notebook

To create a new node to join all tables to form a master table, you need to add the three dataframes to a cell in the Jupyter notebook:

preprocessed_shuttles = context.catalog.load("preprocessed_shuttles")
preprocessed_companies = context.catalog.load("preprocessed_companies")
reviews = context.catalog.load("reviews")

master = create_master_table(preprocessed_shuttles, preprocessed_companies, reviews)
master.head()

Extending the project’s code

Having tested that all is working with the master table, it is now time to add the code you’ve worked on to the Spaceflights project code. First, add the create_master_table() function from the snippet above to data_engineering.py (you do not need to copy the import statement import pandas as pd).

Then you should add it to the pipeline in pipeline.py by adding the node as follows:

node(
    create_master_table,
    ["preprocessed_shuttles", "preprocessed_companies", "reviews"],
    "master_table"
),

By adding this code to the project, you are telling Kedro that the function create_master_table should be called with the data loaded from datasets preprocessed_shuttles, preprocessed_companies, and reviews and the output should be saved to dataset master_table.

You will also need to add an import statement for create_master_table at the top of the file:

from kedro_tutorial.nodes.data_engineering import (
    preprocess_companies,
    preprocess_shuttles,
    create_master_table,
)

If you want your data to be saved to file rather than used in-memory, you also need to add an entry to the catalog.yml file like this:

master_table:
  type: CSVLocalDataSet
  filepath: data/03_primary/master_table.csv

You may want to test that all is working with your code at this point:

kedro run

2019-08-19 10:55:47,534 - root - INFO - ** Kedro project kedro-tutorial
2019-08-19 10:55:47,541 - kedro.io.data_catalog - INFO - Loading data from `shuttles` (ExcelLocalDataSet)...
2019-08-19 10:55:55,670 - kedro.pipeline.node - INFO - Running node: preprocess2: preprocess_shuttles([shuttles]) -> [preprocessed_shuttles]
2019-08-19 10:55:55,736 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_shuttles` (CSVLocalDataSet)...
2019-08-19 10:55:56,284 - kedro.runner.sequential_runner - INFO - Completed 1 out of 3 tasks
2019-08-19 10:55:56,284 - kedro.io.data_catalog - INFO - Loading data from `companies` (CSVLocalDataSet)...
2019-08-19 10:55:56,318 - kedro.pipeline.node - INFO - Running node: preprocess1: preprocess_companies([companies]) -> [preprocessed_companies]
2019-08-19 10:55:56,361 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_companies` (CSVLocalDataSet)...
2019-08-19 10:55:56,610 - kedro.runner.sequential_runner - INFO - Completed 2 out of 3 tasks
2019-08-19 10:55:56,610 - kedro.io.data_catalog - INFO - Loading data from `preprocessed_shuttles` (CSVLocalDataSet)...
2019-08-19 10:55:56,715 - kedro.io.data_catalog - INFO - Loading data from `preprocessed_companies` (CSVLocalDataSet)...
2019-08-19 10:55:56,750 - kedro.io.data_catalog - INFO - Loading data from `reviews` (CSVLocalDataSet)...
2019-08-19 10:55:56,812 - kedro.pipeline.node - INFO - Running node: create_master_table([preprocessed_companies,preprocessed_shuttles,reviews]) -> [master_table]
2019-08-19 10:55:58,679 - kedro.io.data_catalog - INFO - Saving data to `master_table` (CSVLocalDataSet)...
2019-08-19 10:56:09,991 - kedro.runner.sequential_runner - INFO - Completed 3 out of 3 tasks
2019-08-19 10:56:09,991 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

Working with multiple pipelines

Having merged three input datasets to create a master table, you are now ready to make another pipeline for a price prediction model. It will be called the data science pipeline.

For this example, we will use a LinearRegression implementation from the scikit-learn library.

You can start by updating the dependencies in src/requirements.txt with the following:

scikit-learn==0.20.2

You can find out more about requirements files here.

Then, from within the project directory, run:

kedro install

Next, create a file src/kedro_tutorial/nodes/price_prediction.py and add the following code to it:

import logging
from typing import Dict, List

import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split


def split_data(data: pd.DataFrame, parameters: Dict) -> List:
    """Splits data into training and test sets.

        Args:
            data: Source data.
            parameters: Parameters defined in parameters.yml.

        Returns:
            A list containing split data.

    """
    X = data[
        [
            "engines",
            "passenger_capacity",
            "crew",
            "d_check_complete",
            "moon_clearance_complete",
        ]
    ].values
    y = data["price"].values
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
    )

    return [X_train, X_test, y_train, y_test]


def train_model(X_train: np.ndarray, y_train: np.ndarray) -> LinearRegression:
    """Train the linear regression model.

        Args:
            X_train: Training data of independent features.
            y_train: Training data for price.

        Returns:
            Trained model.

    """
    regressor = LinearRegression()
    regressor.fit(X_train, y_train)
    return regressor


def evaluate_model(regressor: LinearRegression, X_test: np.ndarray, y_test: np.ndarray):
    """Calculate the coefficient of determination and log the result.

        Args:
            regressor: Trained model.
            X_test: Testing data of independent features.
            y_test: Testing data for price.

    """
    y_pred = regressor.predict(X_test)
    score = r2_score(y_test, y_pred)
    logger = logging.getLogger(__name__)
    logger.info("Model has a coefficient R^2 of %.3f.", score)

Add the following to conf/base/parameters.yml:

test_size: 0.2
random_state: 3

These are the parameters fed into the DataCatalog when the pipeline is executed. Alternatively, the parameters specified in parameters.yml can also be referenced using params: prefix in the nodes. For example, you could pass test_size and random_state parameters as follows:


def split_data(data: pd.DataFrame, test_size: str, random_state: str) -> List:
    """
    Arguments now accepts `test_size` and `random_state` rather than `parameters: Dict`.
    """
    X = data[
        [
            "engines",
            "passenger_capacity",
            "crew",
            "d_check_complete",
            "moon_clearance_complete",
        ]
    ].values
    y = data["price"].values
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )

    return [X_train, X_test, y_train, y_test]

# ...
ds_pipeline = Pipeline(
        [
            node(
                split_data,
                ["master_table", "params:test_size", "params:random_state"],
                ["X_train", "X_test", "y_train", "y_test"],
            )
        ]
    )

Next, register the dataset, which will save the trained model, by adding the following definition to conf/base/catalog.yml:

regressor:
  type: PickleLocalDataSet
  filepath: data/06_models/regressor.pickle
  versioned: true
Note: Versioning is enabled for regressor, which means that the pickled output of the regressor will be versioned and saved every time the pipeline is run. This allows us to keep the history of the models built using this pipeline. See the details in the Versioning section of the User Guide.

Now to create a pipeline for the price prediction model. In src/kedro_tutorial/nodes/pipeline.py, update create_pipeline() to add an extra import statement as follows:

from kedro_tutorial.nodes.price_prediction import split_data, train_model, evaluate_model

Then add a separate pipeline, by replacing the code in create_pipeline() as follows:

def create_pipeline(**kwargs):
    """Create the project's pipeline.

    Args:
        kwargs: Ignore any additional arguments added in the future.

    Returns:
        Pipeline: The resulting pipeline.

    """
    de_pipeline = Pipeline(
        [
            node(preprocess_companies, "companies", "preprocessed_companies"),
            node(preprocess_shuttles, "shuttles", "preprocessed_shuttles"),
            node(
                create_master_table,
                ["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                "master_table",
            ),
        ]
    )

    ds_pipeline = Pipeline(
        [
            node(
                split_data,
                ["master_table", "parameters"],
                ["X_train", "X_test", "y_train", "y_test"],
            ),
            node(train_model, ["X_train", "y_train"], "regressor"),
            node(evaluate_model, ["regressor", "X_test", "y_test"], None),
        ]
    )

    return de_pipeline + ds_pipeline

The first node of the ds_pipeline outputs 4 objects: X_train, X_test, y_train, y_test, which are not registered in conf/base/catalog.yml. (If you recall, if a dataset is not specified in the catalog, Kedro will automatically save it in memory using the MemoryDataSet). Normally you would add dataset definitions of your model features into conf/base/catalog.yml with the save location in data/04_features/.

The two pipelines are merged together in de_pipeline + ds_pipeline. Both pipelines will be executed when you invoke the following:

kedro run

You should see output similar to the following:

kedro run

2019-08-19 10:51:46,501 - root - INFO - ** Kedro project kedro-tutorial
2019-08-19 10:51:46,510 - kedro.io.data_catalog - INFO - Loading data from `companies` (CSVLocalDataSet)...
2019-08-19 10:51:46,547 - kedro.pipeline.node - INFO - Running node: preprocess1: preprocess_companies([companies]) -> [preprocessed_companies]
2019-08-19 10:51:46,597 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_companies` (CSVLocalDataSet)...
2019-08-19 10:51:46,906 - kedro.runner.sequential_runner - INFO - Completed 1 out of 6 tasks
2019-08-19 10:51:46,906 - kedro.io.data_catalog - INFO - Loading data from `shuttles` (ExcelLocalDataSet)...
2019-08-19 10:51:55,324 - kedro.pipeline.node - INFO - Running node: preprocess2: preprocess_shuttles([shuttles]) -> [preprocessed_shuttles]
2019-08-19 10:51:55,389 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_shuttles` (CSVLocalDataSet)...
2019-08-19 10:51:55,932 - kedro.runner.sequential_runner - INFO - Completed 2 out of 6 tasks
2019-08-19 10:51:55,932 - kedro.io.data_catalog - INFO - Loading data from `preprocessed_shuttles` (CSVLocalDataSet)...
2019-08-19 10:51:56,042 - kedro.io.data_catalog - INFO - Loading data from `preprocessed_companies` (CSVLocalDataSet)...
2019-08-19 10:51:56,078 - kedro.io.data_catalog - INFO - Loading data from `reviews` (CSVLocalDataSet)...
2019-08-19 10:51:56,139 - kedro.pipeline.node - INFO - Running node: create_master_table([preprocessed_companies,preprocessed_shuttles,reviews]) -> [master_table]
2019-08-19 10:51:58,037 - kedro.io.data_catalog - INFO - Saving data to `master_table` (CSVLocalDataSet)...
2019-08-19 10:52:09,133 - kedro.runner.sequential_runner - INFO - Completed 3 out of 6 tasks
2019-08-19 10:52:09,133 - kedro.io.data_catalog - INFO - Loading data from `master_table` (CSVLocalDataSet)...
2019-08-19 10:52:10,941 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2019-08-19 10:52:10,941 - kedro.pipeline.node - INFO - Running node: split_data([master_table,parameters]) -> [X_test,X_train,y_test,y_train]
2019-08-19 10:52:11,343 - kedro.io.data_catalog - INFO - Saving data to `X_train` (MemoryDataSet)...
2019-08-19 10:52:11,372 - kedro.io.data_catalog - INFO - Saving data to `X_test` (MemoryDataSet)...
2019-08-19 10:52:11,380 - kedro.io.data_catalog - INFO - Saving data to `y_train` (MemoryDataSet)...
2019-08-19 10:52:11,381 - kedro.io.data_catalog - INFO - Saving data to `y_test` (MemoryDataSet)...
2019-08-19 10:52:11,443 - kedro.runner.sequential_runner - INFO - Completed 4 out of 6 tasks
2019-08-19 10:52:11,443 - kedro.io.data_catalog - INFO - Loading data from `X_train` (MemoryDataSet)...
2019-08-19 10:52:11,472 - kedro.io.data_catalog - INFO - Loading data from `y_train` (MemoryDataSet)...
2019-08-19 10:52:11,474 - kedro.pipeline.node - INFO - Running node: train_model([X_train,y_train]) -> [regressor]
2019-08-19 10:52:11,704 - kedro.io.data_catalog - INFO - Saving data to `regressor` (PickleLocalDataSet)...
2019-08-19 10:52:11,776 - kedro.runner.sequential_runner - INFO - Completed 5 out of 6 tasks
2019-08-19 10:52:11,776 - kedro.io.data_catalog - INFO - Loading data from `regressor` (PickleLocalDataSet)...
2019-08-19 10:52:11,776 - kedro.io.data_catalog - INFO - Loading data from `X_test` (MemoryDataSet)...
2019-08-19 10:52:11,784 - kedro.io.data_catalog - INFO - Loading data from `y_test` (MemoryDataSet)...
2019-08-19 10:52:11,785 - kedro.pipeline.node - INFO - Running node: evaluate_model([X_test,regressor,y_test]) -> None
2019-08-19 10:52:11,830 - kedro_tutorial.nodes.price_prediction - INFO - Model has a coefficient R^2 of 0.456.
2019-08-19 10:52:11,869 - kedro.runner.sequential_runner - INFO - Completed 6 out of 6 tasks
2019-08-19 10:52:11,869 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

The de_pipeline will preprocess the data, and ds_pipeline will then create features, train and evaluate the model.

Note: The order in which you add the pipelines together is not significant and ds_pipeline + de_pipeline will result in the same pipeline, since Kedro automatically detects the correct execution order for all the nodes in the resulting pipeline.

Partial pipeline runs

In some cases, you may want to partially run the pipeline. For example, you may need to only run the ds_pipeline to tune the hyperparameters of the price prediction model and skip de_pipeline execution. The most obvious way of doing this is by modifying your Python code for pipeline definition, however, this method sometimes introduces errors.

A better way to run partial pipelines without changing your code is to use tags. Each node within the pipeline can be tagged by passing name into the Pipeline(). Update the create_pipeline() code in pipeline.py one more time:

def create_pipeline(**kwargs):
    """Create the project's pipeline.

    Args:
        kwargs: Ignore any additional arguments added in the future.

    Returns:
        Pipeline: The resulting pipeline.

    """
    de_pipeline = Pipeline(
        [
            node(preprocess_companies, "companies", "preprocessed_companies"),
            node(preprocess_shuttles, "shuttles", "preprocessed_shuttles"),
            node(
                create_master_table,
                ["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                "master_table",
            ),
        ],
        name="de",
    )

    ds_pipeline = Pipeline(
        [
            node(
                split_data,
                ["master_table", "parameters"],
                ["X_train", "X_test", "y_train", "y_test"],
            ),
            node(train_model, ["X_train", "y_train"], "regressor"),
            node(evaluate_model, ["regressor", "X_test", "y_test"], None),
        ],
        name="ds",
    )

    return de_pipeline + ds_pipeline

To run a partial pipeline:

kedro run --tag=ds

This will skip the execution of the pipeline with tag de and only run the ds nodes (found within the ds_pipeline). If you want to run the whole pipeline:

kedro run

or:

kedro run --tag=ds --tag=de
Note: You can also attach tags to the individual nodes by passing the tags keyword to the node() function, and these are used in addition to any tags specified at the pipeline level. To tag a node as my-regressor-node:
node(
    train_model,
    ["X_train", "y_train"],
    "regressor",
    tags=["my-regressor-node"],
)

Using decorators for nodes and pipelines

In this section, you will learn about Kedro’s built-in decorators as well as how to create your own node and pipeline decorators.

Python decorators can be applied to Kedro nodes. Let’s walk through an example of building our own decorator for logging the execution time of each node.

Decorating the nodes

Logging the execution time for each node can be performed by creating a function and adding it to each node as a decorator.

In data_engineering.py, add the following decorator function near the top of the file:

from functools import wraps
from typing import Callable
import time
import logging

def log_running_time(func: Callable) -> Callable:
    """Decorator for logging node execution time.

        Args:
            func: Function to be executed.

        Returns:
            Decorator for logging the running time.

    """
    @wraps(func)
    def with_time(*args, **kwargs):
        log = logging.getLogger(__name__)
        t_start = time.time()
        result = func(*args, **kwargs)
        t_end = time.time()
        elapsed = t_end - t_start
        log.info("Running %r took %.2f seconds", func.__name__, elapsed)
        return result
    return with_time

And apply it to each data engineering function by prepending @log_running_time to the definition:

@log_running_time
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    ...

@log_running_time
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    ...

Then, if you run your pipeline from the command line, you should see a similar output:

kedro run

...
kedro_tutorial.nodes.data_engineering - INFO - Running 'preprocess_companies' took XXX seconds
...
kedro_tutorial.nodes.data_engineering - INFO - Running 'preprocess_shuttles' took XXX seconds

Decorating the pipeline

A decorator can also be applied to the pipeline rather than each node. In src/kedro_tutorial/nodes/pipeline.py, update the imports from data_engineering.py as follows:

from kedro_tutorial.nodes.data_engineering import (
    preprocess_companies,
    preprocess_shuttles,
    create_master_table,
    log_running_time,
)

Then add the decorators to the pipeline:

def create_pipeline(**kwargs):
    """Create the project's pipeline.

    Args:
        kwargs: Ignore any additional arguments added in the future.

    Returns:
        Pipeline: The resulting pipeline.

    """
    de_pipeline = Pipeline(
        [
            node(preprocess_companies, "companies", "preprocessed_companies"),
            node(preprocess_shuttles, "shuttles", "preprocessed_shuttles"),
            node(
                create_master_table,
                ["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                "master_table",
            ),
        ],
        name="de",
    ).decorate(log_running_time)

    ds_pipeline = Pipeline(
        [
            node(
                split_data,
                ["master_table", "parameters"],
                ["X_train", "X_test", "y_train", "y_test"],
            ),
            node(train_model, ["X_train", "y_train"], "regressor"),
            node(evaluate_model, ["regressor", "X_test", "y_test"], None),
        ],
        name="ds",
    ).decorate(log_running_time)

    return de_pipeline + ds_pipeline

This decorator is commonly used and Kedro already includes it as a built-in decorator called kedro.pipeline.decorators.log_time.

Another built-in decorator is kedro.pipeline.decorators.mem_profile, which will log the maximum memory usage of your node.

Kedro runners

Having specified the data catalog and the pipeline, you are now ready to run the pipeline. There are two different runners you can specify:

  • SequentialRunner - runs your nodes sequentially; once a node has completed its task then the next one starts.
  • ParallelRunner - runs your nodes in parallel; independent nodes are able to run at the same time, allowing you to take advantage of multiple CPU cores.

By default, Kedro uses a SequentialRunner, which is instantiated when you execute kedro run from the command line. Switching to use ParallelRunner is as simple as providing an additional flag when running the pipeline from the command line as follows:

kedro run --parallel

ParallelRunner executes the pipeline nodes in parallel, and is more efficient when there are independent branches in your pipeline.

Note: ParallelRunner performs task parallelisation, which is different from data parallelisation as seen in PySpark.