Conditionals

Tags: Intermediate

Flytekit elevates conditions to a first-class construct named conditional, providing a powerful mechanism for selectively executing branches in a workflow. Conditions leverage static or dynamic data generated by tasks or received as workflow inputs. While conditions are highly performant in their evaluation, it’s important to note that they are restricted to specific binary and logical operators and are applicable only to primitive values.

Note

To clone and run the example code on this page, see the Flytesnacks repo.

To begin, import the necessary libraries.

advanced_composition/conditional.py
import random

from flytekit import conditional, task, workflow

Simple branch

In this example, we introduce two tasks, calculate_circle_circumference and calculate_circle_area. The workflow dynamically chooses between these tasks based on whether the input falls within the fraction range (0-1) or not.

advanced_composition/conditional.py
@task
def calculate_circle_circumference(radius: float) -> float:
    return 2 * 3.14 * radius  # Task to calculate the circumference of a circle


@task
def calculate_circle_area(radius: float) -> float:
    return 3.14 * radius * radius  # Task to calculate the area of a circle


@workflow
def shape_properties(radius: float) -> float:
    return (
        conditional("shape_properties")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(calculate_circle_circumference(radius=radius))
        .else_()
        .then(calculate_circle_area(radius=radius))
    )


if __name__ == "__main__":
    radius_small = 0.5
    print(f"Circumference of circle (radius={radius_small}): {shape_properties(radius=radius_small)}")

    radius_large = 3.0
    print(f"Area of circle (radius={radius_large}): {shape_properties(radius=radius_large)}")

Multiple branches

We establish an if condition with multiple branches, which will result in a failure if none of the conditions is met. It’s important to note that any conditional statement in Flyte is expected to be complete, meaning that all possible branches must be accounted for.

advanced_composition/conditional.py
@workflow
def shape_properties_with_multiple_branches(radius: float) -> float:
    return (
        conditional("shape_properties_with_multiple_branches")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(calculate_circle_circumference(radius=radius))
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .then(calculate_circle_area(radius=radius))
        .else_()
        .fail("The input must be within the range of 0 to 10.")
    )

Note

Take note of the usage of bitwise operators (&). Due to Python’s PEP-335, the logical and, or and not operators cannot be overloaded. Flytekit employs bitwise & and | as equivalents for logical and and or operators, a convention also observed in other libraries.

Consuming the output of a conditional

Here, we write a task that consumes the output returned by a conditional.

advanced_composition/conditional.py
@workflow
def shape_properties_accept_conditional_output(radius: float) -> float:
    result = (
        conditional("shape_properties_accept_conditional_output")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(calculate_circle_circumference(radius=radius))
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .then(calculate_circle_area(radius=radius))
        .else_()
        .fail("The input must exist between 0 and 10.")
    )
    return calculate_circle_area(radius=result)


if __name__ == "__main__":
    print(f"Circumference of circle x Area of circle (radius={radius_small}): {shape_properties(radius=5.0)}")

Using the output of a previous task in a conditional

You can check if a boolean returned from the previous task is True, but unary operations are not supported directly. Instead, use the is_true, is_false and is_none methods on the result.

advanced_composition/conditional.py
@task
def coin_toss(seed: int) -> bool:
    """
    Mimic a condition to verify the successful execution of an operation
    """
    r = random.Random(seed)
    if r.random() < 0.5:
        return True
    return False


@task
def failed() -> int:
    """
    Mimic a task that handles failure
    """
    return -1


@task
def success() -> int:
    """
    Mimic a task that handles success
    """
    return 0


@workflow
def boolean_wf(seed: int = 5) -> int:
    result = coin_toss(seed=seed)
    return conditional("coin_toss").if_(result.is_true()).then(success()).else_().then(failed())

Note

How do output values acquire these methods? In a workflow, direct access to outputs is not permitted. Inputs and outputs are automatically encapsulated in a special object known as flytekit.extend.Promise.

Using boolean workflow inputs in a conditional

You can directly pass a boolean to a workflow.

advanced_composition/conditional.py
@workflow
def boolean_input_wf(boolean_input: bool) -> int:
    return conditional("boolean_input_conditional").if_(boolean_input.is_true()).then(success()).else_().then(failed())

Note

Observe that the passed boolean possesses a method called is_true. This boolean resides within the workflow context and is encapsulated in a specialized Flytekit object. This special object enables it to exhibit additional behavior.

You can run the workflows locally as follows:

advanced_composition/conditional.py
if __name__ == "__main__":
    print("Running boolean_wf a few times...")
    for index in range(0, 5):
        print(f"The output generated by boolean_wf = {boolean_wf(seed=index)}")
        print(
            f"Boolean input: {True if index < 2 else False}; workflow output: {boolean_input_wf(boolean_input=True if index < 2 else False)}"
        )

Nested conditionals

You can nest conditional sections arbitrarily inside other conditional sections. However, these nested sections can only be in the then part of a conditional block.

advanced_composition/conditional.py
@workflow
def nested_conditions(radius: float) -> float:
    return (
        conditional("nested_conditions")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(
            conditional("inner_nested_conditions")
            .if_(radius < 0.5)
            .then(calculate_circle_circumference(radius=radius))
            .elif_((radius >= 0.5) & (radius < 0.9))
            .then(calculate_circle_area(radius=radius))
            .else_()
            .fail("0.9 is an outlier.")
        )
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .then(calculate_circle_area(radius=radius))
        .else_()
        .fail("The input must be within the range of 0 to 10.")
    )


if __name__ == "__main__":
    print(f"nested_conditions(0.4): {nested_conditions(radius=0.4)}")

Using the output of a task in a conditional

Let’s write a fun workflow that triggers the calculate_circle_circumference task in the event of a “heads” outcome, and alternatively, runs the calculate_circle_area task in the event of a “tail” outcome.

advanced_composition/conditional.py
@workflow
def consume_task_output(radius: float, seed: int = 5) -> float:
    is_heads = coin_toss(seed=seed)
    return (
        conditional("double_or_square")
        .if_(is_heads.is_true())
        .then(calculate_circle_circumference(radius=radius))
        .else_()
        .then(calculate_circle_area(radius=radius))
    )

You can run the workflow locally as follows:

advanced_composition/conditional.py
if __name__ == "__main__":
    default_seed_output = consume_task_output(radius=0.4)
    print(
        f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_circumference => {default_seed_output}"
    )

    custom_seed_output = consume_task_output(radius=0.4, seed=7)
    print(f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_area => {custom_seed_output}")

Run the example on the Flyte cluster

To run the provided workflows on the Flyte cluster, use the following commands:

pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
  shape_properties --radius 3.0
pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
  shape_properties_with_multiple_branches --radius 11.0
pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
  shape_properties_accept_conditional_output --radius 0.5
pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
  boolean_wf
pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
  boolean_input_wf --boolean_input
pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
  nested_conditions --radius 0.7
pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
  consume_task_output --radius 0.4 --seed 7