Flyte Native Scheduler Architecture

Introduction

Any workflow engine needs functionality to support scheduled executions. Flyte fulfills this need using an in-built native scheduler, which allows the scheduling of fixed rate as well as cron based schedules. The workflow author specifies the schedule during the launchplan creation and activates or deactivates the schedule using the admin API’s exposed for the launchplan.

Characteristics

  1. Cloud provider independent

  2. Standard cron support

  3. Independently scalable

  4. Small memory footprint

  5. Schedules run as lightweight go routines

  6. Fault tolerant and available

  7. Support in sandbox environment

Components

Schedule Management

This component supports creation/activation and deactivation of schedules. Each schedule is tied to a launchplan and is versioned in a similar manner. The schedule is created or its state is changed to activated/deactivated whenever the admin API is invoked for it with ACTIVE/INACTIVE state. This is done either through flytectl or through any other client calling the GRPC API. The API is similar to that of a launchplan, which makes sure one schedule at most is active for a given launchplan.

Scheduler

This component is a singleton and is responsible for reading the schedules from the DB and running them at the cadence defined by the schedule. The lowest granularity supported is minutes for scheduling through both cron and fixed rate schedulers. The scheduler would be running in one replica, two at the most during redeployment. Multiple replicas will just duplicate the work, since each execution for a scheduleTime will have a unique identifier derived from the schedule name and the time of the schedule. The idempotency aspect of the admin for the same identifier prevents duplication on the admin side. The scheduler runs continuously in a loop reading the updated schedule entries in the data store and adding or removing the schedules. Removing a schedule will not alter in-flight go-routines launched by the scheduler. Thus the behavior of these executions is undefined.

Snapshoter

This component is responsible for writing the snapshot state of all schedules at a regular cadence to a persistent store. It uses a DB to store the GOB format of the snapshot, which is versioned. The snapshot is a map[string]time.Time, which stores a map of schedule names to their last execution times. During bootup the snapshot is bootstraped from the data store and loaded in the memory. The Scheduler uses this snapshot to schedule any missed schedules.

CatchupAll-System

This component runs at bootup and catches up all the schedules to the current time.Now(). New runs for the schedules are also sent to the admin in parallel. But any failure in catching up is considered to be a hard failure and stops the scheduler. The rerun tries to catchup from the last snapshotted data.

GOCronWrapper

This component is responsible for locking in the time for the scheduled job to be invoked and adding those to the cron scheduler. It is a wrapper around the following framework for fixed rate and cron schedules and creates in-memory representation of the scheduled job functions. The scheduler provides the ability to schedule a function with scheduleTime parameters. This is useful to know once the scheduled function is invoked as to what scheduled time this invocation is for. This scheduler supports standard cron scheduling which has 5 fields. It requires 5 entries representing: minute, hour, day of month, month and day of week, in that order.

Job Executor

This component is responsible for sending the scheduled executions to flyteadmin. The job function accepts the scheduleTime and the schedule which is used for creating an execution request to the admin. Each job function is tied to the schedule, which is executed in separate go routine according the schedule cadence.

Monitoring

The following metrics are published by the native scheduler for easier monitoring of the health of the system:

  1. JobFuncPanicCounter : count of crashes of the job functions executed by the scheduler

  2. JobScheduledFailedCounter : count of scheduling failures by the scheduler

  3. CatchupErrCounter : count of unsuccessful attempts to catchup on the schedules

  4. FailedExecutionCounter : count of unsuccessful attempts to fire executions of a schedule

  5. SuccessfulExecutionCounter : count of successful attempts to fire executions of a schedule