flytekit.workflow

flytekit.workflow(_workflow_function=None, failure_policy=None, interruptible=False)[source]

This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks.

Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because while we can determine the entire structure of a task by looking at the function’s signature, workflows need to run through the function itself because the body of the function is what expresses the workflow structure. It’s also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding).

Example:

@workflow
def my_wf_example(a: int) -> typing.Tuple[int, int]:
    """example

    Workflows can have inputs and return outputs of simple or complex types.

    :param a: input a
    :return: outputs
    """

    x = add_5(a=a)

    # You can use outputs of a previous task as inputs to other nodes.
    z = add_5(a=x)

    # You can call other workflows from within this workflow
    d = simple_wf()

    # You can add conditions that can run on primitive types and execute different branches
    e = conditional("bool").if_(a == 5).then(add_5(a=d)).else_().then(add_5(a=z))

    # Outputs of the workflow have to be outputs returned by prior nodes.
    # No outputs and single or multiple outputs are supported
    return x, e

Again, users should keep in mind that even though the body of the function looks like regular Python, it is actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not your typical Python values. So even though you may have a task t1() -> int, when a = t1() is called, a will not be an integer so if you try to range(a) you’ll get an error.

Please see the user guide for more usage examples.

Parameters
  • _workflow_function – This argument is implicitly passed and represents the decorated function.

  • failure_policy (Optional[flytekit.core.workflow.WorkflowFailurePolicy]) – Use the options in flytekit.WorkflowFailurePolicy

  • interruptible (bool) – Whether or not tasks launched from this workflow are by default interruptible