import abc
import json
import re
from typing import Dict
from flyteidl.admin import common_pb2 as _common_pb2
from flyteidl.core import literals_pb2 as _literals_pb2
from google.protobuf import json_format as _json_format
from google.protobuf import struct_pb2 as _struct
class FlyteABCMeta(abc.ABCMeta):
def __instancecheck__(cls, instance):
if cls in type(instance).__mro__:
return True
return super(FlyteABCMeta, cls).__instancecheck__(instance)
class FlyteType(FlyteABCMeta):
def __repr__(cls):
return cls.short_class_string()
def __str__(cls):
return cls.verbose_class_string()
def short_class_string(cls):
"""
:rtype: Text
"""
return super(FlyteType, cls).__repr__()
def verbose_class_string(cls):
"""
:rtype: Text
"""
return cls.short_class_string()
@abc.abstractmethod
def from_flyte_idl(cls, idl_object):
pass
class FlyteIdlEntity(object, metaclass=FlyteType):
def __eq__(self, other):
return isinstance(other, FlyteIdlEntity) and other.to_flyte_idl() == self.to_flyte_idl()
def __ne__(self, other):
return not (self == other)
def __repr__(self):
return self.short_string()
def __str__(self):
return self.verbose_string()
def __hash__(self):
return hash(self.to_flyte_idl().SerializeToString(deterministic=True))
def short_string(self):
"""
:rtype: Text
"""
literal_str = re.sub(r"\s+", " ", str(self.to_flyte_idl())).strip()
type_str = type(self).__name__
return f"<FlyteLiteral({type_str}) {literal_str}>"
def verbose_string(self):
"""
:rtype: Text
"""
return self.short_string()
def serialize_to_string(self) -> str:
return self.to_flyte_idl().SerializeToString()
@property
def is_empty(self):
return len(self.to_flyte_idl().SerializeToString()) == 0
@abc.abstractmethod
def to_flyte_idl(self):
pass
class FlyteCustomIdlEntity(FlyteIdlEntity):
@classmethod
def from_flyte_idl(cls, idl_object):
"""
:param _struct.Struct idl_object:
:return: FlyteCustomIdlEntity
"""
return cls.from_dict(idl_dict=_json_format.MessageToDict(idl_object))
def to_flyte_idl(self):
return _json_format.Parse(json.dumps(self.to_dict()), _struct.Struct())
@abc.abstractmethod
def from_dict(self, idl_dict):
pass
@abc.abstractmethod
def to_dict(self):
"""
Converts self to a dictionary.
:rtype: dict[Text, T]
"""
pass
class NamedEntityIdentifier(FlyteIdlEntity):
def __init__(self, project, domain, name=None):
"""
:param Text project: The name of the project in which this entity lives.
:param Text domain: The name of the domain within the project.
:param Text name: [Optional] The name of the entity within the namespace of the project and domain.
"""
self._project = project
self._domain = domain
self._name = name
@property
def project(self):
"""
The name of the project in which this entity lives.
:rtype: Text
"""
return self._project
@property
def domain(self):
"""
The name of the domain within the project.
:rtype: Text
"""
return self._domain
@property
def name(self):
"""
The name of the entity within the namespace of the project and domain.
:rtype: Text
"""
return self._name
def to_flyte_idl(self):
"""
Stores object to a Flyte-IDL defined protobuf.
:rtype: flyteidl.admin.common_pb2.NamedEntityIdentifier
"""
# We use the kwarg constructor of the protobuf and setting name=None is equivalent to not setting it at all
return _common_pb2.NamedEntityIdentifier(project=self.project, domain=self.domain, name=self.name)
@classmethod
def from_flyte_idl(cls, idl_object):
"""
:param flyteidl.admin.common_pb2.NamedEntityIdentifier idl_object:
:rtype: NamedEntityIdentifier
"""
return cls(idl_object.project, idl_object.domain, idl_object.name)
class EmailNotification(FlyteIdlEntity):
def __init__(self, recipients_email):
"""
:param list[Text] recipients_email:
"""
self._recipients_email = recipients_email
@property
def recipients_email(self):
"""
:rtype: list[Text]
"""
return self._recipients_email
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.common_pb2.EmailNotification
"""
return _common_pb2.EmailNotification(recipients_email=self.recipients_email)
@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.common_pb2.EmailNotification pb2_object:
:rtype: EmailNotification
"""
return cls(pb2_object.recipients_email)
class SlackNotification(FlyteIdlEntity):
def __init__(self, recipients_email):
"""
:param list[Text] recipients_email:
"""
self._recipients_email = recipients_email
@property
def recipients_email(self):
"""
:rtype: list[Text]
"""
return self._recipients_email
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.common_pb2.SlackNotification
"""
return _common_pb2.SlackNotification(recipients_email=self.recipients_email)
@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.common_pb2.SlackNotification pb2_object:
:rtype: EmailNotification
"""
return cls(pb2_object.recipients_email)
class PagerDutyNotification(FlyteIdlEntity):
def __init__(self, recipients_email):
"""
:param list[Text] recipients_email:
"""
self._recipients_email = recipients_email
@property
def recipients_email(self):
"""
:rtype: list[Text]
"""
return self._recipients_email
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.common_pb2.PagerDutyNotification
"""
return _common_pb2.PagerDutyNotification(recipients_email=self.recipients_email)
@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.common_pb2.PagerDutyNotification pb2_object:
:rtype: EmailNotification
"""
return cls(pb2_object.recipients_email)
class Notification(FlyteIdlEntity):
def __init__(
self,
phases,
email: EmailNotification = None,
pager_duty: PagerDutyNotification = None,
slack: SlackNotification = None,
):
"""
Represents a structure for notifications based on execution status.
:param list[int] phases: A list of phases to which users can associate the notifications.
:param EmailNotification email: [Optional] Specify this for an email notification.
:param PagerDutyNotification email: [Optional] Specify this for a PagerDuty notification.
:param SlackNotification email: [Optional] Specify this for a Slack notification.
"""
self._phases = phases
self._email = email
self._pager_duty = pager_duty
self._slack = slack
@property
def phases(self):
"""
A list of phases to which users can associate the notifications.
:rtype: list[int]
"""
return self._phases
@property
def email(self):
"""
:rtype: EmailNotification
"""
return self._email
@property
def pager_duty(self):
"""
:rtype: PagerDutyNotification
"""
return self._pager_duty
@property
def slack(self):
"""
:rtype: SlackNotification
"""
return self._slack
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.common_pb2.Notification
"""
return _common_pb2.Notification(
phases=self.phases,
email=self.email.to_flyte_idl() if self.email else None,
pager_duty=self.pager_duty.to_flyte_idl() if self.pager_duty else None,
slack=self.slack.to_flyte_idl() if self.slack else None,
)
@classmethod
def from_flyte_idl(cls, p):
"""
:param flyteidl.admin.common_pb2.Notification p:
:rtype: Notification
"""
return cls(
p.phases,
email=EmailNotification.from_flyte_idl(p.email) if p.HasField("email") else None,
pager_duty=PagerDutyNotification.from_flyte_idl(p.pager_duty) if p.HasField("pager_duty") else None,
slack=SlackNotification.from_flyte_idl(p.slack) if p.HasField("slack") else None,
)
[docs]
class Labels(FlyteIdlEntity):
def __init__(self, values):
"""
Label values to be applied to a workflow execution resource.
:param dict[Text, Text] values:
"""
self._values = values
@property
def values(self):
return self._values
[docs]
def to_flyte_idl(self):
"""
:rtype: dict[Text, Text]
"""
return _common_pb2.Labels(values={k: v for k, v in self.values.items()})
[docs]
@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.common_pb2.Labels pb2_object:
:rtype: Labels
"""
return cls({k: v for k, v in pb2_object.values.items()})
[docs]
class Annotations(FlyteIdlEntity):
def __init__(self, values):
"""
Annotation values to be applied to a workflow execution resource.
:param dict[Text, Text] values:
"""
self._values = values
@property
def values(self):
return self._values
[docs]
def to_flyte_idl(self):
"""
:rtype: _common_pb2.Annotations
"""
return _common_pb2.Annotations(values={k: v for k, v in self.values.items()})
[docs]
@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.common_pb2.Annotations pb2_object:
:rtype: Annotations
"""
return cls({k: v for k, v in pb2_object.values.items()})
class UrlBlob(FlyteIdlEntity):
def __init__(self, url, bytes):
"""
:param Text url:
:param int bytes:
"""
self._url = url
self._bytes = bytes
@property
def url(self):
"""
:rtype: Text
"""
return self._url
@property
def bytes(self):
"""
:rtype: int
"""
return self._bytes
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.common_pb2.UrlBlob
"""
return _common_pb2.UrlBlob(url=self.url, bytes=self.bytes)
@classmethod
def from_flyte_idl(cls, pb):
"""
:param flyteidl.admin.common_pb2.UrlBlob pb:
:rtype: UrlBlob
"""
return cls(url=pb.url, bytes=pb.bytes)
[docs]
class AuthRole(FlyteIdlEntity):
def __init__(self, assumable_iam_role=None, kubernetes_service_account=None):
"""Auth configuration for IAM or K8s service account.
Either one or both of the assumable IAM role and/or the K8s service account can be set.
:param Text assumable_iam_role: IAM identity with set permissions policies.
:param Text kubernetes_service_account: Provides an identity for workflow execution resources.
Flyte deployment administrators are responsible for handling permissions as they
relate to the service account.
"""
self._assumable_iam_role = assumable_iam_role
self._kubernetes_service_account = kubernetes_service_account
@property
def assumable_iam_role(self):
"""
The IAM role to execute the workflow with
:rtype: Text
"""
return self._assumable_iam_role
@property
def kubernetes_service_account(self):
"""
The kubernetes service account to execute the workflow with
:rtype: Text
"""
return self._kubernetes_service_account
[docs]
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.launch_plan_pb2.Auth
"""
return _common_pb2.AuthRole(
assumable_iam_role=self.assumable_iam_role if self.assumable_iam_role else None,
kubernetes_service_account=self.kubernetes_service_account if self.kubernetes_service_account else None,
)
[docs]
@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.launch_plan_pb2.Auth pb2_object:
:rtype: Auth
"""
return cls(
assumable_iam_role=pb2_object.assumable_iam_role,
kubernetes_service_account=pb2_object.kubernetes_service_account,
)
class RawOutputDataConfig(FlyteIdlEntity):
def __init__(self, output_location_prefix):
"""
:param Text output_location_prefix: Location of offloaded data for things like S3, etc.
"""
self._output_location_prefix = output_location_prefix
@property
def output_location_prefix(self):
return self._output_location_prefix
def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.common_pb2.Auth
"""
return _common_pb2.RawOutputDataConfig(output_location_prefix=self.output_location_prefix)
@classmethod
def from_flyte_idl(cls, pb2):
return cls(output_location_prefix=pb2.output_location_prefix)
class Envs(FlyteIdlEntity):
def __init__(self, envs: Dict[str, str]):
self._envs = envs
@property
def envs(self) -> Dict[str, str]:
return self._envs
def to_flyte_idl(self) -> _common_pb2.Envs:
return _common_pb2.Envs(values=[_literals_pb2.KeyValuePair(key=k, value=v) for k, v in self.envs.items()])
@classmethod
def from_flyte_idl(cls, pb2: _common_pb2.Envs) -> _common_pb2.Envs:
return cls(envs={kv.key: kv.value for kv in pb2.values})