Flyte Native Scheduler Architecture

Introduction

Any workflow engine needs functionality to support scheduled executions. Flyte fulfills this using an in-built native scheduler, which schedules fixed rate and cron-based schedules. The workflow author specifies the schedule during the launchplan creation and activates or deactivates the schedule using the admin APIs exposed for the launch plan.

Characteristics

  1. Cloud provider independent

  2. Standard cron support

  3. Independently scalable

  4. Small memory footprint

  5. Schedules run as lightweight goroutines

  6. Fault tolerant and available

  7. Support in sandbox environment

Components

Schedule Management

This component supports creation/activation and deactivation of schedules. Each schedule is tied to a launch plan and is versioned in a similar manner. The schedule is created or its state is changed to activated/deactivated whenever the admin API is invoked for it with ACTIVE/INACTIVE state. This is done either through flytectl or through any other client that calls the GRPC API. The API is similar to a launchplan, ensuring that only one schedule is active for a given launchplan.

Scheduler

This component is a singleton and is responsible for reading the schedules from the DB and running them at the cadence defined by the schedule. The lowest granularity supported is minutes for scheduling through both cron and fixed rate schedulers. The scheduler can run in one replica, two at the most during redeployment. Multiple replicas will only duplicate the work, since each execution for a scheduleTime will have a unique identifier derived from the schedule name and the time of the schedule. The idempotency aspect of the admin for the same identifier prevents duplication on the admin side. The scheduler runs continuously in a loop reading the updated schedule entries in the data store and adding or removing the schedules. Removing a schedule will not alter the in-flight goroutines launched by the scheduler. Thus, the behavior of these executions is undefined.

Snapshoter

This component is responsible for writing the snapshot state of all schedules at a regular cadence to a persistent store. It uses a DB to store the GOB format of the snapshot, which is versioned. The snapshot is a map[string]time.Time, which stores a map of schedule names to their last execution times. During bootup, the snapshot is bootstrapped from the data store and loaded into memory. The Scheduler uses this snapshot to schedule any missed schedules.

CatchupAll-System

This component runs at bootup and catches up all the schedules to current time, i.e., time.Now(). New runs for the schedules are sent to the admin in parallel. Any failure in catching up is considered a hard failure and stops the scheduler. The rerun tries to catchup from the last snapshot of data.

GOCronWrapper

This component is responsible for locking in the time for the scheduled job to be invoked and adding those to the cron scheduler. It is a wrapper around this framework for fixed rate and cron schedules that creates in-memory representation of the scheduled job functions. The scheduler schedules a function with scheduleTime parameters. When this scheduled function is invoked, the scheduleTime parameters provide the current schedule time used by the scheduler. This scheduler supports standard cron scheduling which has 5 fields. It requires 5 entries representing minute, hour, day of month, month and day of week, in that order.

Job Executor

The job executor component is responsible for sending the scheduled executions to FlyteAdmin. The job function accepts scheduleTime and the schedule which is used to create an execution request to the admin. Each job function is tied to the schedule which is executed in a separate goroutine in accordance with the schedule cadence.

Monitoring

To monitor the system health, the following metrics are published by the native scheduler:

  1. JobFuncPanicCounter : count of crashes of the job functions executed by the scheduler.

  2. JobScheduledFailedCounter : count of scheduling failures by the scheduler.

  3. CatchupErrCounter : count of unsuccessful attempts to catchup on the schedules.

  4. FailedExecutionCounter : count of unsuccessful attempts to fire executions of a schedule.

  5. SuccessfulExecutionCounter : count of successful attempts to fire executions of a schedule.