How to Trigger the Feast Workflow using FlyteRemote

The goal of this notebook is to train a simple Gaussian Naive Bayes model using sklearn on a modified Horse-Colic dataset from UCI.

The model aims to classify if the lesion of the horse is surgical or not.

Let’s get started!

Set the AWS environment variables before importing Flytekit.

import os

os.environ["FLYTE_AWS_ENDPOINT"] = os.environ["FEAST_S3_ENDPOINT_URL"] = "http://localhost:30084/"
os.environ["FLYTE_AWS_ACCESS_KEY_ID"] = os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["FLYTE_AWS_SECRET_ACCESS_KEY"] = os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"

01. Register the code

The actual workflow code is auto-documented and rendered using sphinx here. We’ve used Flytekit to express the pipeline in pure Python.

You can use FlyteConsole to launch, monitor, and introspect Flyte executions. However here, let’s use flytekit.remote to interact with the Flyte backend.

from flytekit.remote import FlyteRemote
from flytekit.configuration import Config

# The `for_sandbox` method instantiates a connection to the demo cluster.
remote = FlyteRemote(
    config=Config.for_sandbox(),
    default_project="flytesnacks",
    default_domain="development"
)

The register_script method can be used to register the workflow.

from flytekit.configuration import ImageConfig

from feast_workflow import feast_workflow

wf = remote.register_script(
    feast_workflow,
    image_config=ImageConfig.from_images(
        "ghcr.io/flyteorg/flytecookbook:feast_integration-latest"
    ),
    version="v2",
    source_path="../",
    module_name="feast_workflow",
)

02: Launch an execution

FlyteRemote provides convenient methods to retrieve version of the pipeline from the remote server.

NOTE: It is possible to get a specific version of the workflow and trigger a launch for that, but let’s just get the latest.

lp = remote.fetch_launch_plan(name="feast_integration.feast_workflow.feast_workflow")
lp.id.version

The execute method can be used to execute a Flyte entity — a launch plan in our case.

execution = remote.execute(
    lp,
    inputs={"num_features_univariate": 5},
    wait=True
)

03. Sync an execution

You can sync an execution to retrieve the workflow’s outputs. sync_nodes is set to True to retrieve the intermediary nodes’ outputs as well.

NOTE: It is possible to fetch an existing execution or simply retrieve an already commenced execution. Also, if you launch an execution with the same name, Flyte will respect that and not restart a new execution!

from flytekit.models.core.execution import WorkflowExecutionPhase

synced_execution = remote.sync(execution, sync_nodes=True)
print(f"Execution {synced_execution.id.name} is in {WorkflowExecutionPhase.enum_to_string(synced_execution.closure.phase)} phase")

04. Retrieve the output

Fetch the model and the model prediction.

model = synced_execution.outputs["o0"]
prediction = synced_execution.outputs["o1"]
prediction

NOTE: The output model is available locally as a JobLibSerialized file, which can be downloaded and loaded.

model

Fetch the repo_config.

repo_config = synced_execution.node_executions["n0"].outputs["o0"]

05. Generate predictions

Re-use the predict function from the workflow to generate predictions — Flytekit will automatically manage the IO for you!

Load features from the online feature store

import os

from feast_workflow import predict, FEAST_FEATURES, retrieve_online

inference_point = retrieve_online(
    repo_config=repo_config,
    online_store=synced_execution.node_executions["n4"].outputs["o0"],
    data_point=533738,
)
inference_point

Generate a prediction

predict(model_ser=model, features=inference_point)