How do I use Flyte scheduling?¶
Usage¶
Launch plans can be set to run automatically on a schedule if the Flyte platform is properly configured. You can even use the scheduled kick-off time in your workflow as an input.
There are two types of schedules, cron schedules, and fixed rate intervals.
Cron Schedules¶
Cron expression strings use the AWS syntax. These are validated at launch plan registration time.
from flytekit import CronSchedule
schedule = CronSchedule(
cron_expression="0 10 * * ? *",
)
This schedule
object can then be used in the construction of a flytekit.LaunchPlan
.
Complete cron example¶
For example, take the following workflow:
from flytekit workflow
@workflow
def MyWorkflow(an_input: int, another_input: int=10):
....
The above can be run on a cron schedule every 5 minutes like so:
from flytekit import CronSchedule, LaunchPlan
cron_lp = LaunchPlan.create(
"my_cron_lp",
MyWorkflow,
schedule=CronSchedule(cron_expression="0 5 * * ? *"),
fixed_inputs={"an_input": 5},
)
Fixed Rate Intervals¶
Fixed rate schedules will run at the specified interval.
from flytekit import FixedRate
from datetime import timedelta
schedule = FixedRate(duration=timedelta(minutes=10))
Complete fixed rate example¶
from flytekit workflow
@workflow
def MyOtherWorkflow(triggered_time: datetime, an_input: int, another_input: int=10):
....
To run MyOtherWorkflow
every 5 minutes with a value set for an_input
and the scheduled execution time
assigned to the triggered_time
input you could define the following launch plan:
from datetime import timedelta
from flytekit import FixedRate, LaunchPlan
fixed_rate_lp = LaunchPlan.create(
"my_fixed_rate_lp",
MyOtherWorkflow,
# Note that kickoff_time_input_arg matches the workflow input we defined above: triggered_time
schedule=FixedRate(duration=timedelta(minutes=5), kickoff_time_input_arg="triggered_time"),
fixed_inputs={"an_input": 3},
)
Please see a more complete example in the cookbook.
Activating a schedule¶
Once you’ve initialized your launch plan, don’t forget to set it to active so that the schedule is run.
You can use pyflyte in container
pyflyte lp -p {{ your project }} -d {{ your domain }} activate-all
Or with flyte-cli view and activate launch plans
flyte-cli -i -h localhost:30081 -p flyteexamples -d development list-launch-plan-versions
Extract the URN returned for the launch plan you’re interested in and make the call to activate it
flyte-cli update-launch-plan -i -h localhost:30081 --state active -u {{ urn }}
Verify your active launch plans:
flyte-cli -i -h localhost:30081 -p flyteexamples -d development list-active-launch-plans
Platform Configuration Changes¶
Scheduling features requires additional infrastructure to run so these will have to be created and configured.
Setting up scheduled workflows¶
In order to run workflow executions based on user-specified schedules you’ll need to fill out the top-level scheduler
portion of the flyteadmin application configuration.
In particular you’ll need to configure the two components responsible for scheduling workflows and processing schedule event triggers.
Note this functionality is currently only supported for AWS installs.
Event Scheduler¶
In order to schedule workflow executions, you’ll need to set up an AWS SQS queue. A standard type queue should suffice. The flyteadmin event scheduler creates AWS CloudWatch event rules that invokes your SQS queue as a target.
With that in mind, let’s take a look at an example eventScheduler
config section and dive into what each value represents:
scheduler:
eventScheduler:
scheme: "aws"
region: "us-east-1"
scheduleRole: "arn:aws:iam::{{ YOUR ACCOUNT ID }}:role/{{ ROLE }}"
targetName: "arn:aws:sqs:us-east-1:{{ YOUR ACCOUNT ID }}:{{ YOUR QUEUE NAME }}"
scheduleNamePrefix: "flyte"
scheme: in this case because AWS is the only cloud back-end supported for scheduling workflows, only
"aws"
is a valid value. By default, the no-op scheduler is used.region: this specifies which region initialized AWS clients should will use when creating CloudWatch rules
- scheduleRole This is the IAM role ARN with permissions set to
Allow
events:PutRule
events:PutTargets
events:DeleteRule
events:RemoveTargets
- scheduleRole This is the IAM role ARN with permissions set to
targetName this is the ARN for the SQS Queue you’ve allocated to scheduling workflows
scheduleNamePrefix this is an entirely optional prefix used when creating schedule rules. Because of AWS naming length restrictions, scheduled rules are a random hash and having a shared prefix makes these names more readable and indicates who generated the rules
Workflow Executor¶
Scheduled events which trigger need to be handled by the workflow executor, which subscribes to triggered events from the SQS queue you’ve configured above.
Caution
Failure to configure a workflow executor will result in all your scheduled events piling up silently without ever kicking off workflow executions.
Again, let’s break down a sample config:
scheduler:
eventScheduler:
...
workflowExecutor:
scheme: "aws"
region: "us-east-1"
scheduleQueueName: "{{ YOUR QUEUE NAME }}"
accountId: "{{ YOUR ACCOUNT ID }}"
scheme: in this case because AWS is the only cloud back-end supported for executing scheduled workflows, only
"aws"
is a valid value. By default, the no-op executor is used.region: this specifies which region AWS clients should will use when creating an SQS subscriber client
scheduleQueueName: this is the name of the SQS Queue you’ve allocated to scheduling workflows
accountId: Your AWS account id