flytekit.core.workflow.ImperativeWorkflow#
- class flytekit.core.workflow.ImperativeWorkflow(name, failure_policy=None, interruptible=False)[source]#
An imperative workflow is a programmatic analogue to the typical
@workflow
function-based workflow and is better suited to programmatic applications.Assuming you have some tasks like so
@task def t1(a: str) -> str: return a + " world" @task def t2(): print("side effect")
You could create a workflow imperatively like so
# Create the workflow with a name. This needs to be unique within the project and takes the place of the function # name that's used for regular decorated function-based workflows. wb = Workflow(name="my_workflow") # Adds a top level input to the workflow. This is like an input to a workflow function. wb.add_workflow_input("in1", str) # Call your tasks. node = wb.add_entity(t1, a=wb.inputs["in1"]) wb.add_entity(t2) # This is analogous to a return statement wb.add_workflow_output("from_n0t1", node.outputs["o0"])
This workflow would be identical on the back-end to
nt = typing.NamedTuple("wf_output", [("from_n0t1", str)]) @workflow def my_workflow(in1: str) -> nt: x = t1(a=in1) t2() return nt(x)
Note that the only reason we need the
NamedTuple
is so we can name the output the same thing as in the imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn’t a big deal in function-workflows because names tend to not be necessary.Methods
- Parameters
name (str) –
failure_policy (Optional[WorkflowFailurePolicy]) –
interruptible (bool) –
- add_entity(entity, **kwargs)[source]#
Anytime you add an entity, all the inputs to the entity must be bound.
- Parameters
entity (Union[flytekit.core.base_task.PythonTask, flytekit.core.launch_plan.LaunchPlan, flytekit.core.workflow.WorkflowBase]) –
- Return type
flytekit.core.node.Node
- add_launch_plan(launch_plan, **kwargs)[source]#
- Parameters
launch_plan (flytekit.core.launch_plan.LaunchPlan) –
- Return type
flytekit.core.node.Node
- add_subwf(sub_wf, **kwargs)[source]#
- Parameters
sub_wf (flytekit.core.workflow.WorkflowBase) –
- Return type
flytekit.core.node.Node
- add_task(task, **kwargs)[source]#
- Parameters
task (flytekit.core.base_task.PythonTask) –
- Return type
flytekit.core.node.Node
- add_workflow_input(input_name, python_type)[source]#
Adds an input to the workflow.
- Parameters
- Return type
- add_workflow_output(output_name, p, python_type=None)[source]#
Add an output with the given name from the given node output.
- Parameters
output_name (str) –
p (Union[flytekit.core.promise.Promise, List[flytekit.core.promise.Promise], Dict[str, flytekit.core.promise.Promise]]) –
- compile(**kwargs)#
- construct_node_metadata()#
- Return type
flytekit.models.core.workflow.NodeMetadata
- create_conditional(name)[source]#
- Parameters
name (str) –
- Return type
flytekit.core.condition.ConditionalSection
- execute(**kwargs)[source]#
Called by local_execute. This function is how local execution for imperative workflows runs. Because when an entity is added using the add_entity function, all inputs to that entity should’ve been already declared, we can just iterate through the nodes in order and we shouldn’t run into any dependency issues. That is, we force the user to declare entities already in a topological sort. To keep track of outputs, we create a map to start things off, filled in only with the workflow inputs (if any). As things are run, their outputs are stored in this map. After all nodes are run, we fill in workflow level outputs the same way as any other previous node.
- local_execute(ctx, **kwargs)#
- Parameters
- Return type
Optional[Union[Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise]]
- local_execution_mode()#
- ready()[source]#
- This function returns whether or not the workflow is in a ready state, which means
Has at least one node
All workflow inputs are bound
These conditions assume that all nodes and workflow i/o changes were done with the functions above, which do additional checking.
- Return type
Attributes
- compilation_state
Compilation is done a bit at a time, one task or other entity call at a time. This is why this workflow class has to keep track of its own compilation state.
- docs
- inputs
This holds the input promises to the workflow. The nodes in these Promise objects should always point to the global start node.
- interface
- name
- nodes
- output_bindings
- python_interface
- short_name
- workflow_metadata
- workflow_metadata_defaults