Source code for flytekit.core.schedule

"""
.. autoclass:: flytekit.core.schedule.CronSchedule
   :noindex:

"""

import datetime
import re
from typing import Optional, Protocol, Union

import croniter
from flyteidl.admin import schedule_pb2
from google.protobuf import message as google_message

from flytekit.models import schedule as _schedule_models


class LaunchPlanTriggerBase(Protocol):
    def to_flyte_idl(self, *args, **kwargs) -> google_message.Message:
        ...


# Duplicates flytekit.common.schedules.Schedule to avoid using the ExtendedSdkType metaclass.
[docs] class CronSchedule(_schedule_models.Schedule): """ Use this when you have a launch plan that you want to run on a cron expression. This uses standard `cron format <https://docs.flyte.org/en/latest/concepts/schedules.html#cron-expression-table>`__ in case where you are using default native scheduler using the schedule attribute. .. code-block:: CronSchedule( schedule="*/1 * * * *", # Following schedule runs every min ) See the :std:ref:`User Guide <cookbook:cron schedules>` for further examples. """ _VALID_CRON_ALIASES = [ "hourly", "hours", "@hourly", "daily", "days", "@daily", "weekly", "weeks", "@weekly", "monthly", "months", "@monthly", "annually", "@annually", "yearly", "years", "@yearly", ] # Not a perfect regex but good enough and simple to reason about _OFFSET_PATTERN = re.compile("([-+]?)P([-+0-9YMWD]+)?(T([-+0-9HMS.,]+)?)?") def __init__( self, cron_expression: Optional[str] = None, schedule: Optional[str] = None, offset: Optional[str] = None, kickoff_time_input_arg: Optional[str] = None, ): """ :param str cron_expression: This should be a cron expression in AWS style.Shouldn't be used in case of native scheduler. :param str schedule: This takes a cron alias (see ``_VALID_CRON_ALIASES``) or a croniter parseable schedule. Only one of this or ``cron_expression`` can be set, not both. This uses standard `cron format <https://docs.flyte.org/en/latest/concepts/schedules.html#cron-expression>`_ and is supported by native scheduler :param str offset: :param str kickoff_time_input_arg: This is a convenient argument to use when your code needs to know what time a run was kicked off. Supply the name of the input argument of your workflow to this argument here. Note that until Flyte has an atomic clock, there could be a few seconds here and there. That is, if your run is supposed to kick off at 3pm UTC every Weds, it may actually be 15:00:02 or something. Example :: @workflow def my_wf(kickoff_time: datetime): ... schedule = CronSchedule( schedule="*/1 * * * *" kickoff_time_input_arg="kickoff_time") """ if cron_expression: raise AssertionError( "cron_expression is deprecated and should not be used. Use `schedule` instead. " "See the documentation for more information." ) if schedule is not None: CronSchedule._validate_schedule(schedule) if offset is not None: CronSchedule._validate_offset(offset) super(CronSchedule, self).__init__( kickoff_time_input_arg, cron_expression=cron_expression, cron_schedule=_schedule_models.Schedule.CronSchedule(schedule, offset) if schedule is not None else None, ) @staticmethod def _validate_expression(cron_expression: str): """ Ensures that the set value is a valid cron string. We use the format used in Cloudwatch and the best explanation can be found here: https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html#CronExpressions :param str cron_expression: cron expression """ # We use the croniter lib to validate our cron expression. Since on the admin side we use Cloudwatch, # we have a couple checks in order to line up Cloudwatch with Croniter. tokens = cron_expression.split() if len(tokens) != 6: raise ValueError( "Cron expression is invalid. A cron expression must have 6 fields. Cron expressions are in the " "format of: `minute hour day-of-month month day-of-week year`. " "Use `schedule` for 5 fields cron expression. Received: `{}`".format(cron_expression) ) if tokens[2] != "?" and tokens[4] != "?": raise ValueError( "Scheduled string is invalid. A cron expression must have a '?' for either day-of-month or " "day-of-week. Please specify '?' for one of those fields. Cron expressions are in the format of: " "minute hour day-of-month month day-of-week year.\n\n" "For more information: " "https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html#CronExpressions" ) try: # Cut to 5 fields and just assume year field is good because croniter treats the 6th field as seconds. # TODO: Parse this field ourselves and check croniter.croniter(" ".join(cron_expression.replace("?", "*").split()[:5])) except Exception: raise ValueError( "Scheduled string is invalid. The cron expression was found to be invalid." f" Provided cron expr: {cron_expression}" ) @staticmethod def _validate_schedule(schedule: str): if schedule.lower() not in CronSchedule._VALID_CRON_ALIASES: try: croniter.croniter(schedule) except Exception: raise ValueError( "Schedule is invalid. It must be set to either a cron alias or valid cron expression." f" Provided schedule: {schedule}" ) @staticmethod def _validate_offset(offset: str): if CronSchedule._OFFSET_PATTERN.fullmatch(offset) is None: raise ValueError("Offset is invalid. It must be an ISO 8601 duration. Provided offset: {}".format(offset))
[docs] class FixedRate(_schedule_models.Schedule): """ Use this class to schedule a fixed-rate interval for a launch plan. .. code-block:: python from datetime import timedelta FixedRate(duration=timedelta(minutes=10)) See the :std:ref:`fixed rate intervals` chapter in the cookbook for additional usage examples. """ def __init__(self, duration: datetime.timedelta, kickoff_time_input_arg: Optional[str] = None): """ :param datetime.timedelta duration: :param str kickoff_time_input_arg: """ super(FixedRate, self).__init__(kickoff_time_input_arg, rate=self._translate_duration(duration)) @staticmethod def _translate_duration(duration: datetime.timedelta): """ :param datetime.timedelta duration: timedelta between runs :rtype: flytekit.models.schedule.Schedule.FixedRate """ _SECONDS_TO_MINUTES = 60 _SECONDS_TO_HOURS = _SECONDS_TO_MINUTES * 60 _SECONDS_TO_DAYS = _SECONDS_TO_HOURS * 24 if duration.microseconds != 0 or duration.seconds % _SECONDS_TO_MINUTES != 0: raise AssertionError( f"Granularity of less than a minute is not supported for FixedRate schedules. Received: {duration}" ) elif int(duration.total_seconds()) % _SECONDS_TO_DAYS == 0: return _schedule_models.Schedule.FixedRate( int(duration.total_seconds() / _SECONDS_TO_DAYS), _schedule_models.Schedule.FixedRateUnit.DAY, ) elif int(duration.total_seconds()) % _SECONDS_TO_HOURS == 0: return _schedule_models.Schedule.FixedRate( int(duration.total_seconds() / _SECONDS_TO_HOURS), _schedule_models.Schedule.FixedRateUnit.HOUR, ) else: return _schedule_models.Schedule.FixedRate( int(duration.total_seconds() / _SECONDS_TO_MINUTES), _schedule_models.Schedule.FixedRateUnit.MINUTE, )
class OnSchedule(LaunchPlanTriggerBase): def __init__(self, schedule: Union[CronSchedule, FixedRate]): """ :param Union[CronSchedule, FixedRate] schedule: Either a cron or a fixed rate """ self._schedule = schedule def to_flyte_idl(self) -> schedule_pb2.Schedule: return self._schedule.to_flyte_idl()