
Tags: Intermediate

Flytekit elevates conditions to a first-class construct named conditional, providing a powerful mechanism for selectively executing branches in a workflow. Conditions leverage static or dynamic data generated by tasks or received as workflow inputs. While conditions are highly performant in their evaluation, it’s important to note that they are restricted to specific binary and logical operators and are applicable only to primitive values.

To begin, import the necessary libraries.

import random

from flytekit import conditional, task, workflow

Simple branch#

In this example, we introduce two tasks, calculate_circle_circumference and calculate_circle_area. The workflow dynamically chooses between these tasks based on whether the input falls within the fraction range (0-1) or not.

def calculate_circle_circumference(radius: float) -> float:
    return 2 * 3.14 * radius  # Task to calculate the circumference of a circle

def calculate_circle_area(radius: float) -> float:
    return 3.14 * radius * radius  # Task to calculate the area of a circle

def shape_properties(radius: float) -> float:
    return (
        .if_((radius >= 0.1) & (radius < 1.0))

if __name__ == "__main__":
    radius_small = 0.5
    print(f"Circumference of circle (radius={radius_small}): {shape_properties(radius=radius_small)}")

    radius_large = 3.0
    print(f"Area of circle (radius={radius_large}): {shape_properties(radius=radius_large)}")

Multiple branches#

We establish an if condition with multiple branches, which will result in a failure if none of the conditions is met. It’s important to note that any conditional statement in Flyte is expected to be complete, meaning that all possible branches must be accounted for.

def shape_properties_with_multiple_branches(radius: float) -> float:
    return (
        .if_((radius >= 0.1) & (radius < 1.0))
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .fail("The input must be within the range of 0 to 10.")


Take note of the usage of bitwise operators (&). Due to Python’s PEP-335, the logical and, or and not operators cannot be overloaded. Flytekit employs bitwise & and | as equivalents for logical and and or operators, a convention also observed in other libraries.

Consuming the output of a conditional#

Here, we write a task that consumes the output returned by a conditional.

def shape_properties_accept_conditional_output(radius: float) -> float:
    result = (
        .if_((radius >= 0.1) & (radius < 1.0))
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .fail("The input must exist between 0 and 10.")
    return calculate_circle_area(radius=result)

if __name__ == "__main__":
    print(f"Circumference of circle x Area of circle (radius={radius_small}): {shape_properties(radius=5.0)}")

Using the output of a previous task in a conditional#

You can check if a boolean returned from the previous task is True, but unary operations are not supported directly. Instead, use the is_true, is_false and is_none methods on the result.

def coin_toss(seed: int) -> bool:
    Mimic a condition to verify the successful execution of an operation
    r = random.Random(seed)
    if r.random() < 0.5:
        return True
    return False

def failed() -> int:
    Mimic a task that handles failure
    return -1

def success() -> int:
    Mimic a task that handles success
    return 0

def boolean_wf(seed: int = 5) -> int:
    result = coin_toss(seed=seed)
    return conditional("coin_toss").if_(result.is_true()).then(success()).else_().then(failed())


How do output values acquire these methods? In a workflow, direct access to outputs is not permitted. Inputs and outputs are automatically encapsulated in a special object known as flytekit.extend.Promise.

Using boolean workflow inputs in a conditional#

You can directly pass a boolean to a workflow.

def boolean_input_wf(boolean_input: bool) -> int:
    return conditional("boolean_input_conditional").if_(boolean_input.is_true()).then(success()).else_().then(failed())


Observe that the passed boolean possesses a method called is_true. This boolean resides within the workflow context and is encapsulated in a specialized Flytekit object. This special object enables it to exhibit additional behavior.

You can run the workflows locally as follows:

if __name__ == "__main__":
    print("Running boolean_wf a few times...")
    for index in range(0, 5):
        print(f"The output generated by boolean_wf = {boolean_wf(seed=index)}")
            f"Boolean input: {True if index < 2 else False}; workflow output: {boolean_input_wf(boolean_input=True if index < 2 else False)}"

Nested conditionals#

You can nest conditional sections arbitrarily inside other conditional sections. However, these nested sections can only be in the then part of a conditional block.

def nested_conditions(radius: float) -> float:
    return (
        .if_((radius >= 0.1) & (radius < 1.0))
            .if_(radius < 0.5)
            .elif_((radius >= 0.5) & (radius < 0.9))
            .fail("0.9 is an outlier.")
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .fail("The input must be within the range of 0 to 10.")

if __name__ == "__main__":
    print(f"nested_conditions(0.4): {nested_conditions(radius=0.4)}")

Using the output of a task in a conditional#

Let’s write a fun workflow that triggers the calculate_circle_circumference task in the event of a “heads” outcome, and alternatively, runs the calculate_circle_area task in the event of a “tail” outcome.

def consume_task_output(radius: float, seed: int = 5) -> float:
    is_heads = coin_toss(seed=seed)
    return (

You can run the workflow locally as follows:

if __name__ == "__main__":
    default_seed_output = consume_task_output(radius=0.4)
        f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_circumference => {default_seed_output}"

    custom_seed_output = consume_task_output(radius=0.4, seed=7)
    print(f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_area => {custom_seed_output}")

Run the example on the Flyte cluster#

To run the provided workflows on the Flyte cluster, use the following commands:

pyflyte run --remote \ \
  shape_properties --radius 3.0
pyflyte run --remote \ \
  shape_properties_with_multiple_branches --radius 11.0
pyflyte run --remote \ \
  shape_properties_accept_conditional_output --radius 0.5
pyflyte run --remote \ \
pyflyte run --remote \ \
  boolean_input_wf --boolean_input
pyflyte run --remote \ \
  nested_conditions --radius 0.7
pyflyte run --remote \ \
  consume_task_output --radius 0.4 --seed 7