Workflows are typically created and specified by decorating a function with the
@workflow decorator. This will
run through the body of the function at compile time, using the subsequent calls of the underlying tasks to determine
and record the workflow structure. This is the declarative style and makes sense when a human is writing it up by hand.
For cases where workflows are constructed programmatically, an imperative style makes more sense. For example, when tasks have already been defined, their order and dependencies have been specified in text format of some kind (perhaps you’re converting from a legacy system), and your goal is to orchestrate those tasks.
import typing from flytekit import Workflow, task
Assume we have the following tasks, and they are meant to represent more complicated tasks.
t1 has simple scalar I/O,
a pure side effect task (though we typically don’t recommend these, they are inevitable), and
t3 takes in a list
as an input.
@task def t1(a: str) -> str: return a + " world" @task def t2(): print("side effect") @task def t3(a: typing.List[str]) -> str: """ This is a pedagogical demo that happens to do a reduction step. Flyte is higher-order orchestration platform, not a map-reduce framework and is not meant to supplant Spark et. al. """ return ",".join(a)
Start by creating an imperative style workflow, which is aliased to just
wb = Workflow(name="my.imperative.workflow.example")
Inputs have to be added to the workflow before they can be used. Add them by specifying the name and the type.
Next associate a task, and pass in the workflow level input.
node_t1 = wb.add_entity(t1, a=wb.inputs["in1"])
Create a workflow output linked to the output of that task.
To add a task that has no inputs or outputs, just add the entity. We don’t need to capture the resulting node because we have no use for it.
We can also pass in a list to a task. Also creating a workflow input returns an object that can be used as an
alternate way of linking workflow inputs. Here,
t3 uses both workflow inputs.
wf_in2 = wb.add_workflow_input("in2", str) node_t3 = wb.add_entity(t3, a=[wb.inputs["in1"], wf_in2])
You can also create a workflow input as a list from multiple task outputs
wb.add_workflow_output( "output_list", [node_t1.outputs["o0"], node_t3.outputs["o0"]], python_type=typing.List[str], ) if __name__ == "__main__": print(wb) print(wb(in1="hello", in2="foo"))
Total running time of the script: ( 0 minutes 0.000 seconds)