Imperative Workflows

Workflows are typically created and specified by decorating a function with the @workflow decorator. This will run through the body of the function at compile time, using the subsequent calls of the underlying tasks to determine and record the workflow structure. This is the declarative style and makes sense when a human is writing it up by hand.

For cases where workflows are constructed programmatically, an imperative style makes more sense. For example, when tasks have already been defined, their order and dependencies have been specified in text format of some kind (perhaps you’re converting from a legacy system), and your goal is to orchestrate those tasks.

import typing

from flytekit import Workflow, task

Assume we have the following tasks, and they are meant to represent more complicated tasks. t1 has simple scalar I/O, t2 is a pure side effect task (though we typically don’t recommend these, they are inevitable), and t3 takes in a list as an input.

@task
def t1(a: str) -> str:
    return a + " world"


@task
def t2():
    print("side effect")


@task
def t3(a: typing.List[str]) -> str:
    """
    This is a pedagogical demo that happens to do a reduction step. Flyte is higher-order orchestration
    platform, not a map-reduce framework and is not meant to supplant Spark et. al.
    """
    return ",".join(a)

Start by creating an imperative style workflow, which is aliased to just Workflow from flytekit

wb = Workflow(name="my.imperative.workflow.example")

Inputs have to be added to the workflow before they can be used. Add them by specifying the name and the type.

wb.add_workflow_input("in1", str)

Next associate a task, and pass in the workflow level input.

node_t1 = wb.add_entity(t1, a=wb.inputs["in1"])

Create a workflow output linked to the output of that task.

wb.add_workflow_output("output_from_t1", node_t1.outputs["o0"])

To add a task that has no inputs or outputs, just add the entity. We don’t need to capture the resulting node because we have no use for it.

wb.add_entity(t2)

We can also pass in a list to a task. Also creating a workflow input returns an object that can be used as an alternate way of linking workflow inputs. Here, t3 uses both workflow inputs.

wf_in2 = wb.add_workflow_input("in2", str)
node_t3 = wb.add_entity(t3, a=[wb.inputs["in1"], wf_in2])

You can also create a workflow input as a list from multiple task outputs

wb.add_workflow_output(
    "output_list",
    [node_t1.outputs["o0"], node_t3.outputs["o0"]],
    python_type=typing.List[str],
)


if __name__ == "__main__":
    print(wb)
    print(wb(in1="hello", in2="foo"))

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery