.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto/core/control_flow/subworkflows.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_core_control_flow_subworkflows.py: Subworkflows ------------ Subworkflows are similar to :ref:`launch plans `, since they allow users to kick off one workflow from inside another. What's the difference? Think of launch plans as pass by pointer and subworkflows as pass by value. .. note:: The reason why subworkflows exist is that this is how Flyte handles dynamic workflows. Instead of hiding this functionality, we expose it at the user level. There are pros and cons of using subworkflows as described below. When Should I Use SubWorkflows? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ If you want to limit parallelism within a workflow and its launched sub-flows, subworkflows provide a clean way to achieve that because they execute within the same context of the parent workflow. Thus, all nodes of a subworkflow are constrained to the total constraint on the parent workflow. Consider this: When you include Workflow A as a subworkflow of Workflow B, and when Workflow B is run, the entire graph of workflow A is copied into workflow B at the point where it is called. Let's understand subworkflow with an example. .. GENERATED FROM PYTHON SOURCE LINES 29-32 Example ^^^^^^^^ We import the required dependencies into the environment. .. GENERATED FROM PYTHON SOURCE LINES 32-37 .. code-block:: default import typing from typing import Tuple from flytekit import task, workflow .. GENERATED FROM PYTHON SOURCE LINES 38-40 Next, we define a task that uses named outputs. We usually try and define ``NamedTuple`` as a distinct type as a best practice (although it can be defined inline). .. GENERATED FROM PYTHON SOURCE LINES 40-48 .. code-block:: default op = typing.NamedTuple("OutputsBC", t1_int_output=int, c=str) @task def t1(a: int) -> op: return op(a + 2, "world") .. GENERATED FROM PYTHON SOURCE LINES 49-50 Then we define a subworkflow like a typical workflow that can run like any other workflow. .. GENERATED FROM PYTHON SOURCE LINES 50-57 .. code-block:: default @workflow def my_subwf(a: int = 42) -> Tuple[str, str]: x, y = t1(a=a) u, v = t1(a=x) return y, v .. GENERATED FROM PYTHON SOURCE LINES 58-64 We call the workflow declared above in a `parent` workflow below which showcases how to override the node name of a task (or subworkflow in this case). Typically, nodes are just named sequentially: ``n0``, ``n1``, and so on. Since the inner ``my_subwf`` also has a ``n0``, you may wish to change the name of the first one. Not changing the name is fine because Flyte automatically prepends an attribute to the inner ``n0`` since node IDs must be distinct within a workflow graph. .. GENERATED FROM PYTHON SOURCE LINES 64-71 .. code-block:: default @workflow def parent_wf(a: int) -> Tuple[int, str, str]: x, y = t1(a=a).with_overrides(node_name="node-t1-parent") u, v = my_subwf(a=x) return x, u, v .. GENERATED FROM PYTHON SOURCE LINES 72-74 .. note:: The with_overrides method provides a new name to the graph-node for better rendering or readability. .. GENERATED FROM PYTHON SOURCE LINES 76-77 You can run the subworkflows locally. .. GENERATED FROM PYTHON SOURCE LINES 77-91 .. code-block:: default if __name__ == "__main__": print(f"Running parent_wf(a=3) {parent_wf(a=3)}") # Interestingly, we can nest a workflow that has a subworkflow within a workflow. # Workflows can be simply composed from other workflows, even if the other workflows are standalone entities. Each of the # workflows in this module can exist and run independently. @workflow def nested_parent_wf(a: int) -> Tuple[int, str, str, str]: x, y = my_subwf(a=a) m, n, o = parent_wf(a=a) return m, n, o, y .. GENERATED FROM PYTHON SOURCE LINES 92-93 You can run the nested workflows locally as well. .. GENERATED FROM PYTHON SOURCE LINES 93-96 .. code-block:: default if __name__ == "__main__": print(f"Running nested_parent_wf(a=3) {nested_parent_wf(a=3)}") .. GENERATED FROM PYTHON SOURCE LINES 97-98 .. note:: You can chain and execute subworkflows similar to chained :ref:`Flyte tasks`. .. GENERATED FROM PYTHON SOURCE LINES 101-116 External Workflow ^^^^^^^^^^^^^^^^^^ When launch plans are used within a workflow to launch the execution of a previously defined workflow, a new external execution is launched, with a separate execution ID and can be observed as a distinct entity in FlyteConsole/Flytectl. They may have separate parallelism constraints since the context is not shared. We refer to such external invocations of a workflow using launch plans from a parent workflow as ``External Workflows``. .. tip:: If your deployment uses :ref:`multicluster-setup `, then external workflows may allow you to distribute the workload of a workflow to multiple clusters. Here is an example demonstrating external workflows: .. GENERATED FROM PYTHON SOURCE LINES 118-119 We import the required dependencies into the environment. .. GENERATED FROM PYTHON SOURCE LINES 119-125 .. code-block:: default import typing # noqa: E402 from collections import Counter # noqa: E402 from typing import Dict, Tuple # noqa: E402 from flytekit import LaunchPlan, task, workflow # noqa: E402 .. GENERATED FROM PYTHON SOURCE LINES 126-127 We define a task that computes the frequency of every word in a string, and returns a dictionary mapping every word to its count. .. GENERATED FROM PYTHON SOURCE LINES 127-137 .. code-block:: default @task def count_freq_words(input_string1: str) -> Dict: # input_string = "The cat sat on the mat" words = input_string1.split() wordCount = dict(Counter(words)) return wordCount .. GENERATED FROM PYTHON SOURCE LINES 138-139 We define a workflow that executes the previously defined task. .. GENERATED FROM PYTHON SOURCE LINES 139-145 .. code-block:: default @workflow def ext_workflow(my_input: str) -> Dict: result = count_freq_words(input_string1=my_input) return result .. GENERATED FROM PYTHON SOURCE LINES 146-147 Next, we create a launch plan. .. GENERATED FROM PYTHON SOURCE LINES 147-152 .. code-block:: default external_lp = LaunchPlan.get_or_create( ext_workflow, "parent_workflow_execution", ) .. GENERATED FROM PYTHON SOURCE LINES 153-154 We define another task that returns the repeated keys (in our case, words) from a dictionary. .. GENERATED FROM PYTHON SOURCE LINES 154-162 .. code-block:: default @task def count_repetitive_words(word_counter: Dict) -> typing.List[str]: repeated_words = [key for key, value in word_counter.items() if value > 1] return repeated_words .. GENERATED FROM PYTHON SOURCE LINES 163-164 We define a workflow that triggers the launch plan of the previously-defined workflow. .. GENERATED FROM PYTHON SOURCE LINES 164-171 .. code-block:: default @workflow def parent_workflow(my_input1: str) -> typing.List[str]: my_op1 = external_lp(my_input=my_input1) my_op2 = count_repetitive_words(word_counter=my_op1) return my_op2 .. GENERATED FROM PYTHON SOURCE LINES 172-173 Here, ``parent_workflow`` is an external workflow. This can be run locally too. .. GENERATED FROM PYTHON SOURCE LINES 173-176 .. code-block:: default if __name__ == "__main__": print("Running parent workflow...") print(parent_workflow(my_input1="the cat took the apple and ate the apple")) .. rst-class:: sphx-glr-timing **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_auto_core_control_flow_subworkflows.py: .. only :: html .. container:: sphx-glr-footer :class: sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: subworkflows.py ` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: subworkflows.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_