Running and scheduling workflows#
Flyte supports the development and debugging of tasks and workflows in a local setting, which increases the iteration speed of building out your data- or machine-learning-driven applications.
Running your workflows locally is great for the initial stages of testing, but what if you need to make sure that they run as intended on a Flyte cluster? And, what if you need to run them on a regular cadence?
In this guide we’ll cover how to run and schedule workflows for both development and production use cases.
Prerequisites
This guide assumes that you’ve completed the previous guides for Creating a Flyte project and Packaging and Registering Workflows.
Create a FlyteRemote
object#
In “Running a workflow locally”, you saw
how to run Flyte workflows with pyflyte run
in the case that you’re working
with standalone scripts.
Once you’re working with larger projects where you’ve registered workflows
to a Flyte cluster, we recommend using the FlyteRemote
client to run workflows from a Python runtime. First, let’s create a FlyteRemote
object:
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote
remote = FlyteRemote(
config=Config.auto(),
default_project="flytesnacks",
default_domain="development",
)
Running a workflow#
You can run workflows using the FlyteRemote
execute()
method, where you need to pass in a dictionary of inputs
that adhere to the
interface defined by the workflow.
If you have access to the @workflow
-decorated function in your Python runtime
environment, you can import and execute it directly:
Before execute it directly, you need to register the workflow first.
pyflyte register wf.py
from workflows.example import wf
execution = remote.execute(
wf,
inputs={"name": "Kermit"},
)
Execute a workflow by fetching a FlyteWorkflow
object from the remote
FlyteAdmin service, which essentially contains the metadata representing a
Flyte workflow that exists on a Flyte cluster backend.
flyte_wf = remote.fetch_workflow(name="workflows.example.wf")
execution = remote.execute(flyte_wf, inputs={"name": "Kermit"})
Note
You can also launch workflows via flytectl
which you can learn more about in
the User Guide.
Running a launch plan#
Similar to workflows, you can run launch plans with FlyteRemote
:
If you have a LaunchPlan
defined in your Python runtime environment, you can
execute it directly:
from workflows.example import wf
launch_plan = LaunchPlan.get_or_create(
wf, name="launch_plan", default_inputs={"name": "Elmo"},
)
execution = remote.execute(launch_plan, inputs={})
Execute a task by fetching a FlyteLaunchPlan
object from the remote
FlyteAdmin service, which essentially contains the metadata representing a
Flyte task that exists on a Flyte cluster backend.
This example assumes that you’ve added a launch_plan
with some default inputs
to the example.py
script and registered it to the backend:
flyte_launchplan = remote.fetch_launch_plan(name="workflows.example.launch_plan")
execution = remote.execute(flyte_launchplan, inputs={})
Running a task#
You can also run individual tasks on a Flyte cluster using FlyteRemote
:
If you have access to the @task
-decorated function in your Python runtime
environment, you can import and execute it directly:
from workflows.example import say_hello
execution = remote.execute(say_hello, inputs={"name": "Kermit"})
Execute a task by fetching a FlyteWorkflow
object from the remote
FlyteAdmin service, which essentially contains the metadata representing a
Flyte task that exists on a Flyte cluster backend.
flyte_task = remote.fetch_task(name="workflows.example.say_hello")
execution = remote.execute(flyte_task, inputs={"name": "Kermit"})
Note
You can also launch tasks via flytectl
, learn more in the User Guide
Fetching inputs and outputs of an execution#
By default, FlyteRemote.execute
is non-blocking, but you can also pass in wait=True
to make it synchronously
wait for the task or workflow to complete.
Print out the Flyte console URL corresponding to your execution with:
print(f"Execution url: {remote.generate_console_url(execution)}")
Synchronize the state of the Flyte execution object with the remote state during
execution with the sync()
method:
synced_execution = remote.sync(execution)
print(synced_execution.inputs) # print out the inputs
You can also wait for the execution after you’ve launched it and access the outputs:
completed_execution = remote.wait(execution)
print(completed_execution.outputs) # print out the outputs
Scheduling a launch plan#
Finally, you can create a LaunchPlan
that’s scheduled
to run at a particular cadence by specifying the schedule
argument:
from flytekit import LaunchPlan, CronSchedule
from workflows.example import wf
launch_plan = LaunchPlan.get_or_create(
wf,
name="wf_launchplan",
# run this launchplan every minute
schedule=CronSchedule(schedule="*/1 * * * *"),
default_inputs={"name": "Elmo"},
)
You can also specify a fixed-rate interval:
from datetime import timedelta
from flytekit import FixedRate
launch_plan = LaunchPlan.get_or_create(
wf,
name="wf_launchplan",
schedule=FixedRate(duration=timedelta(minutes=1)),
default_inputs={"name": "Elmo"},
)
Passing in the scheduled kick-off time#
Suppose that your workflow is parameterized to take in a datetime
argument,
which determines how the workflow is executed (e.g. reading in data using the
current date).
You can specify a kickoff_time_input_arg
in the schedule so that it
automatically passes the cron schedule kick-off time into the workflow:
from datetime import datetime
from flytekit import workflow, LaunchPlan, CronSchedule
@workflow
def process_data_wf(kickoff_time: datetime):
# read data and process it based on kickoff_time
...
process_data_lp = LaunchPlan.get_or_create(
process_data_wf,
name="process_data_lp",
schedule=CronSchedule(
schedule="*/1 * * * *",
kickoff_time_input_arg="kickoff_time",
)
)
Registering launch plans#
Any of the methods described in the Registering workflows guide will register a launchplan as long as it’s defined in any of the Python modules that you want to register to a Flyte backend.
Activating a schedule#
Once you’ve registered your launch plan, You can use the FlyteRemote
client or
the flytectl
CLI to activate the schedule:
launchplan_id = remote.fetch_launch_plan(name="process_data_lp").id
remote.client.update_launch_plan(launchplan_id, "ACTIVE")
flytectl update launchplan -p flyteexamples -d development \
process_data_lp --version <VERSION> --activate
Deactivating a schedule#
Similarly, you can deactivate a launchplan with:
launchplan_id = remote.fetch_launch_plan(name="process_data_lp").id
remote.client.update_launch_plan(launchplan_id, "INACTIVE")
flytectl update launchplan -p flyteexamples -d development \
process_data_lp --version <VERSION> --archive
What’s next?#
In this guide, you learned about how to:
Run tasks, workflows, and launch plans using
FlyteRemote
.Create a cron schedule to run a launch plan at a specified time interval.
In the next guide, you’ll learn how to visualize tasks using Flyte Decks.