Imperative workflows

Tags: Basic

Workflows are commonly created by applying the @workflow decorator to Python functions. During compilation, this involves processing the function’s body and utilizing subsequent calls to underlying tasks to establish and record the workflow structure. This approach is known as declarative and is suitable when manually drafting the workflow.

However, in cases where workflows are constructed programmatically, an imperative style is more appropriate. For instance, if tasks have been defined already, their sequence and dependencies might have been specified in textual form (perhaps during a transition from a legacy system). In such scenarios, you want to orchestrate these tasks. This is where Flyte’s imperative workflows come into play, allowing you to programmatically construct workflows.

Note

To clone and run the example code on this page, see the Flytesnacks repo.

To begin, import the necessary dependencies:

basics/imperative_workflow.py
from flytekit import Workflow

We import the slope and intercept tasks from the workflow.py file:

basics/imperative_workflow.py
from .workflow import intercept, slope

Create an imperative workflow:

basics/imperative_workflow.py
imperative_wf = Workflow(name="imperative_workflow")

Add the workflow inputs to the imperative workflow:

basics/imperative_workflow.py
imperative_wf.add_workflow_input("x", list[int])
imperative_wf.add_workflow_input("y", list[int])

Note

If you want to assign default values to the workflow inputs, you can create a launch plan.

Add the tasks that need to be triggered from within the workflow:

basics/imperative_workflow.py
node_t1 = imperative_wf.add_entity(slope, x=imperative_wf.inputs["x"], y=imperative_wf.inputs["y"])
node_t2 = imperative_wf.add_entity(
    intercept, x=imperative_wf.inputs["x"], y=imperative_wf.inputs["y"], slope=node_t1.outputs["o0"]
)

Lastly, add the workflow output:

basics/imperative_workflow.py
imperative_wf.add_workflow_output("wf_output", node_t2.outputs["o0"])

You can execute the workflow locally as follows:

basics/imperative_workflow.py
if __name__ == "__main__":
    print(f"Running imperative_wf() {imperative_wf(x=[-3, 0, 3], y=[7, 4, -2])}")

Note

You also have the option to provide a list of inputs and retrieve a list of outputs from the workflow:

wf_input_y = imperative_wf.add_workflow_input("y", list[str])
node_t3 = wf.add_entity(some_task, a=[wf.inputs["x"], wf_input_y])
wf.add_workflow_output(
    "list_of_outputs",
    [node_t1.outputs["o0"], node_t2.outputs["o0"]],
    python_type=list[str],
)