flytekit.core.workflow.ImperativeWorkflow

class flytekit.core.workflow.ImperativeWorkflow(name, failure_policy=None, interruptible=False)[source]

An imperative workflow is a programmatic analogue to the typical @workflow function-based workflow and is better suited to programmatic applications.

Assuming you have some tasks like so

@task
def t1(a: str) -> str:
    return a + " world"

@task
def t2():
    print("side effect")

You could create a workflow imperatively like so

# Create the workflow with a name. This needs to be unique within the project and takes the place of the function
# name that's used for regular decorated function-based workflows.
wb = Workflow(name="my_workflow")
# Adds a top level input to the workflow. This is like an input to a workflow function.
wb.add_workflow_input("in1", str)
# Call your tasks.
node = wb.add_entity(t1, a=wb.inputs["in1"])
wb.add_entity(t2)
# This is analogous to a return statement
wb.add_workflow_output("from_n0t1", node.outputs["o0"])

This workflow would be identical on the back-end to

nt = typing.NamedTuple("wf_output", [("from_n0t1", str)])

@workflow
def my_workflow(in1: str) -> nt:
    x = t1(a=in1)
    t2()
    return nt(x)

Note that the only reason we need the NamedTuple is so we can name the output the same thing as in the imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn’t a big deal in function-workflows because names tend to not be necessary.

Methods

Parameters:
add_entity(entity, **kwargs)[source]

Anytime you add an entity, all the inputs to the entity must be bound.

Parameters:

entity (PythonTask | LaunchPlan | WorkflowBase)

Return type:

Node

add_launch_plan(launch_plan, **kwargs)[source]
Parameters:

launch_plan (LaunchPlan)

Return type:

Node

add_subwf(sub_wf, **kwargs)[source]
Parameters:

sub_wf (WorkflowBase)

Return type:

Node

add_task(task, **kwargs)[source]
Parameters:

task (PythonTask)

Return type:

Node

add_workflow_input(input_name, python_type)[source]

Adds an input to the workflow.

Parameters:
  • input_name (str)

  • python_type (Type)

Return type:

Promise

add_workflow_output(output_name, p, python_type=None)[source]

Add an output with the given name from the given node output.

Parameters:
compile(**kwargs)
construct_node_metadata()
Return type:

NodeMetadata

create_conditional(name)[source]
Parameters:

name (str)

Return type:

ConditionalSection

execute(**kwargs)[source]

Called by local_execute. This function is how local execution for imperative workflows runs. Because when an entity is added using the add_entity function, all inputs to that entity should’ve been already declared, we can just iterate through the nodes in order and we shouldn’t run into any dependency issues. That is, we force the user to declare entities already in a topological sort. To keep track of outputs, we create a map to start things off, filled in only with the workflow inputs (if any). As things are run, their outputs are stored in this map. After all nodes are run, we fill in workflow level outputs the same way as any other previous node.

local_execute(ctx, **kwargs)
Parameters:

ctx (FlyteContext)

Return type:

Tuple[Promise] | Promise | VoidPromise | None

local_execution_mode()
Return type:

Mode

ready()[source]
This function returns whether or not the workflow is in a ready state, which means
  • Has at least one node

  • All workflow inputs are bound

These conditions assume that all nodes and workflow i/o changes were done with the functions above, which do additional checking.

Return type:

bool

Attributes

compilation_state

Compilation is done a bit at a time, one task or other entity call at a time. This is why this workflow class has to keep track of its own compilation state.

docs
failure_node
inputs

This holds the input promises to the workflow. The nodes in these Promise objects should always point to the global start node.

interface
name
nodes
on_failure
output_bindings
python_interface
short_name
workflow_metadata
workflow_metadata_defaults