flytekit.workflow#
- flytekit.workflow(_workflow_function: None = None, failure_policy: WorkflowFailurePolicy | None = None, interruptible: bool = False, on_failure: WorkflowBase | Task | None = None, docs: Documentation | None = None, pickle_untyped: bool = False, default_options: Options | None = None) Callable[[Callable[[...], FuncOut]], PythonFunctionWorkflow] [source]#
- flytekit.workflow(_workflow_function: Callable[[P], FuncOut], failure_policy: WorkflowFailurePolicy | None = None, interruptible: bool = False, on_failure: WorkflowBase | Task | None = None, docs: Documentation | None = None, pickle_untyped: bool = False, default_options: Options | None = None) Callable[[P], FuncOut] | PythonFunctionWorkflow
This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks.
Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because while we can determine the entire structure of a task by looking at the function’s signature, workflows need to run through the function itself because the body of the function is what expresses the workflow structure. It’s also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding).
Example:
@workflow def my_wf_example(a: int) -> typing.Tuple[int, int]: """example Workflows can have inputs and return outputs of simple or complex types. :param a: input a :return: outputs """ x = add_5(a=a) # You can use outputs of a previous task as inputs to other nodes. z = add_5(a=x) # You can call other workflows from within this workflow d = simple_wf() # You can add conditions that can run on primitive types and execute different branches e = conditional("bool").if_(a == 5).then(add_5(a=d)).else_().then(add_5(a=z)) # Outputs of the workflow have to be outputs returned by prior nodes. # No outputs and single or multiple outputs are supported return x, e
Again, users should keep in mind that even though the body of the function looks like regular Python, it is actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not your typical Python values. So even though you may have a task
t1() -> int
, whena = t1()
is called,a
will not be an integer so if you try torange(a)
you’ll get an error.Please see the user guide for more usage examples.
- Parameters:
_workflow_function – This argument is implicitly passed and represents the decorated function.
failure_policy – Use the options in flytekit.WorkflowFailurePolicy
interruptible – Whether or not tasks launched from this workflow are by default interruptible
on_failure – Invoke this workflow or task on failure. The Workflow / task has to match the signature of the current workflow, with an additional parameter called error Error
docs – Description entity for the workflow
pickle_untyped – This is a flag that allows users to bypass the type-checking that Flytekit does when constructing the workflow. This is not recommended for general use.
default_options – Default options for the workflow when creating a default launch plan. Currently only the labels and annotations are allowed to be set as defaults.