EDA and Feature Engineering in Jupyter Notebook and Modeling in a Flyte Task#
In this example, we will implement a simple pipeline that takes hyperparameters, does EDA, feature engineering (step 1: EDA and feature engineering in notebook), and measures the Gradient Boosting modelโs performance using mean absolute error (MAE) (step 2: Modeling in a Flyte Task).
First, letโs import the libraries we will use in this example.
import os
import pathlib
from dataclasses import dataclass
import numpy as np
import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import Resources, kwtypes, task, workflow
from flytekitplugins.papermill import NotebookTask
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import RobustScaler
We define a dataclass
to store the hyperparameters of the Gradient Boosting Regressor.
@dataclass_json
@dataclass
class Hyperparameters(object):
n_estimators: int = 150
max_depth: int = 3
max_features: str = "sqrt"
min_samples_split: int = 4
random_state: int = 2
nfolds: int = 10
We define a NotebookTask
to run the Jupyter notebook.
This notebook returns dummified_data
and dataset
as the outputs.
Note
dummified_data
is used in this example, and dataset
is used in the upcoming example.
nb = NotebookTask(
name="eda-feature-eng-nb",
notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "supermarket_regression_1.ipynb"),
outputs=kwtypes(dummified_data=pd.DataFrame, dataset=str),
requests=Resources(mem="500Mi"),
)
Next, we define a cross_validate
function and a modeling
task to compute the MAE score of the data against
the Gradient Boosting Regressor.
def cross_validate(model, nfolds, feats, targets):
score = -1 * (cross_val_score(model, feats, targets, cv=nfolds, scoring="neg_mean_absolute_error"))
return np.mean(score)
@task
def modeling(
dataset: pd.DataFrame,
hyperparams: Hyperparameters,
) -> float:
y_target = dataset["Product_Supermarket_Sales"].tolist()
dataset.drop(["Product_Supermarket_Sales"], axis=1, inplace=True)
X_train, X_test, y_train, _ = train_test_split(dataset, y_target, test_size=0.3)
scaler = RobustScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
gb_model = GradientBoostingRegressor(
n_estimators=hyperparams.n_estimators,
max_depth=hyperparams.max_depth,
max_features=hyperparams.max_features,
min_samples_split=hyperparams.min_samples_split,
random_state=hyperparams.random_state,
)
return cross_validate(gb_model, hyperparams.nfolds, X_train, y_train)
We define a workflow
to run the notebook and the modeling
task.
@workflow
def notebook_wf(hyperparams: Hyperparameters = Hyperparameters()) -> float:
output = nb()
mae_score = modeling(dataset=output.dummified_data, hyperparams=hyperparams)
return mae_score
We can now run the notebook and the modeling task locally.
if __name__ == "__main__":
print(notebook_wf())