Running and scheduling workflows#

Flyte supports the development and debugging of tasks and workflows in a local setting, which increases the iteration speed of building out your data- or machine-learning-driven applications.

Running your workflows locally is great for the initial stages of testing, but what if you need to make sure that they run as intended on a Flyte cluster? And, what if you need to run them on a regular cadence?

In this guide we’ll cover how to run and schedule workflows for both development and production use cases.

Prerequisites

This guide assumes that you’ve completed the previous guides for Creating a Flyte project and Packaging and Registering Workflows.

Create a FlyteRemote object#

In “Running a workflow locally”, you saw how to run Flyte workflows with pyflyte run in the case that you’re working with standalone scripts.

Once you’re working with larger projects where you’ve registered workflows to a Flyte cluster, we recommend using the FlyteRemote client to run workflows from a Python runtime. First, let’s create a FlyteRemote object:

from flytekit.configuration import Config
from flytekit.remote import FlyteRemote

remote = FlyteRemote(
    config=Config.auto(),
    default_project="flytesnacks",
    default_domain="development",
)

Running a workflow#

You can run workflows using the FlyteRemote execute() method, where you need to pass in a dictionary of inputs that adhere to the interface defined by the workflow.

If you have access to the @workflow-decorated function in your Python runtime environment, you can import and execute it directly:

Before execute it directly, you need to register the workflow first.

pyflyte register wf.py
from workflows.example import wf

execution = remote.execute(
    wf,
    inputs={"name": "Kermit"},
)

Note

You can also launch workflows via flytectl which you can learn more about in the User Guide.

Running a launch plan#

Similar to workflows, you can run launch plans with FlyteRemote:

If you have a LaunchPlan defined in your Python runtime environment, you can execute it directly:

from workflows.example import wf

launch_plan = LaunchPlan.get_or_create(
    wf, name="launch_plan", default_inputs={"name": "Elmo"},
)

execution = remote.execute(launch_plan, inputs={})

Running a task#

You can also run individual tasks on a Flyte cluster using FlyteRemote:

If you have access to the @task-decorated function in your Python runtime environment, you can import and execute it directly:

from workflows.example import say_hello

execution = remote.execute(say_hello, inputs={"name": "Kermit"})

Note

You can also launch tasks via flytectl, learn more in the User Guide

Fetching inputs and outputs of an execution#

By default, FlyteRemote.execute is non-blocking, but you can also pass in wait=True to make it synchronously wait for the task or workflow to complete.

Print out the Flyte console URL corresponding to your execution with:

print(f"Execution url: {remote.generate_console_url(execution)}")

Synchronize the state of the Flyte execution object with the remote state during execution with the sync() method:

synced_execution = remote.sync(execution)
print(synced_execution.inputs)  # print out the inputs

You can also wait for the execution after you’ve launched it and access the outputs:

completed_execution = remote.wait(execution)
print(completed_execution.outputs)  # print out the outputs

Scheduling a launch plan#

Finally, you can create a LaunchPlan that’s scheduled to run at a particular cadence by specifying the schedule argument:

from flytekit import LaunchPlan, CronSchedule

from workflows.example import wf


launch_plan = LaunchPlan.get_or_create(
    wf,
    name="wf_launchplan",
    # run this launchplan every minute
    schedule=CronSchedule(schedule="*/1 * * * *"),
    default_inputs={"name": "Elmo"},
)

You can also specify a fixed-rate interval:

from datetime import timedelta
from flytekit import FixedRate


launch_plan = LaunchPlan.get_or_create(
    wf,
    name="wf_launchplan",
    schedule=FixedRate(duration=timedelta(minutes=1)),
    default_inputs={"name": "Elmo"},
)

Passing in the scheduled kick-off time#

Suppose that your workflow is parameterized to take in a datetime argument, which determines how the workflow is executed (e.g. reading in data using the current date).

You can specify a kickoff_time_input_arg in the schedule so that it automatically passes the cron schedule kick-off time into the workflow:

from datetime import datetime
from flytekit import workflow, LaunchPlan, CronSchedule


@workflow
def process_data_wf(kickoff_time: datetime):
    # read data and process it based on kickoff_time
    ...

process_data_lp = LaunchPlan.get_or_create(
    process_data_wf,
    name="process_data_lp",
    schedule=CronSchedule(
        schedule="*/1 * * * *",
        kickoff_time_input_arg="kickoff_time",
    )
)

Registering launch plans#

Any of the methods described in the Registering workflows guide will register a launchplan as long as it’s defined in any of the Python modules that you want to register to a Flyte backend.

Activating a schedule#

Once you’ve registered your launch plan, You can use the FlyteRemote client or the flytectl CLI to activate the schedule:

launchplan_id = remote.fetch_launch_plan(name="process_data_lp").id
remote.client.update_launch_plan(launchplan_id, "ACTIVE")

Deactivating a schedule#

Similarly, you can deactivate a launchplan with:

launchplan_id = remote.fetch_launch_plan(name="process_data_lp").id
remote.client.update_launch_plan(launchplan_id, "INACTIVE")

What’s next?#

In this guide, you learned about how to:

  • Run tasks, workflows, and launch plans using FlyteRemote.

  • Create a cron schedule to run a launch plan at a specified time interval.

In the next guide, you’ll learn how to visualize tasks using Flyte Decks.