"""
This module contains shadow entities for all Flyte entities as represented in Flyte Admin / Control Plane.
The goal is to enable easy access, manipulation of these entities.
"""
from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Union
from flytekit import FlyteContext
from flytekit.core import constants as _constants
from flytekit.core import hash as _hash_mixin
from flytekit.core import hash as hash_mixin
from flytekit.core.promise import create_and_link_node_from_remote
from flytekit.exceptions import system as _system_exceptions
from flytekit.exceptions import user as _user_exceptions
from flytekit.loggers import logger
from flytekit.models import interface as _interface_models
from flytekit.models import launch_plan as _launch_plan_model
from flytekit.models import launch_plan as _launch_plan_models
from flytekit.models import launch_plan as launch_plan_models
from flytekit.models import task as _task_model
from flytekit.models import task as _task_models
from flytekit.models.admin.workflow import WorkflowSpec
from flytekit.models.core import compiler as compiler_models
from flytekit.models.core import identifier as _identifier_model
from flytekit.models.core import identifier as id_models
from flytekit.models.core import workflow as _workflow_model
from flytekit.models.core import workflow as _workflow_models
from flytekit.models.core.identifier import Identifier
from flytekit.models.core.workflow import Node, WorkflowMetadata, WorkflowMetadataDefaults
from flytekit.models.interface import TypedInterface
from flytekit.models.literals import Binding
from flytekit.models.task import TaskSpec
from flytekit.remote import interface as _interface
from flytekit.remote import interface as _interfaces
from flytekit.remote.remote_callable import RemoteEntity
[docs]
class FlyteTask(hash_mixin.HashOnReferenceMixin, RemoteEntity, TaskSpec):
"""A class encapsulating a remote Flyte task."""
def __init__(
self,
id,
type,
metadata,
interface,
custom,
container=None,
task_type_version: int = 0,
config=None,
should_register: bool = False,
):
super(FlyteTask, self).__init__(
template=_task_model.TaskTemplate(
id,
type,
metadata,
interface,
custom,
container=container,
task_type_version=task_type_version,
config=config,
)
)
self._should_register = should_register
@property
def id(self):
"""
This is generated by the system and uniquely identifies the task.
:rtype: flytekit.models.core.identifier.Identifier
"""
return self.template.id
@property
def type(self):
"""
This is used to identify additional extensions for use by Propeller or SDK.
:rtype: Text
"""
return self.template.type
@property
def metadata(self):
"""
This contains information needed at runtime to determine behavior such as whether or not outputs are
discoverable, timeouts, and retries.
:rtype: TaskMetadata
"""
return self.template.metadata
@property
def interface(self):
"""
The interface definition for this task.
:rtype: flytekit.models.interface.TypedInterface
"""
return self.template.interface
@property
def custom(self):
"""
Arbitrary dictionary containing metadata for custom plugins.
:rtype: dict[Text, T]
"""
return self.template.custom
@property
def task_type_version(self):
return self.template.task_type_version
@property
def container(self):
"""
If not None, the target of execution should be a container.
:rtype: Container
"""
return self.template.container
@property
def config(self):
"""
Arbitrary dictionary containing metadata for parsing and handling custom plugins.
:rtype: dict[Text, T]
"""
return self.template.config
@property
def security_context(self):
return self.template.security_context
@property
def k8s_pod(self):
return self.template.k8s_pod
@property
def sql(self):
return self.template.sql
@property
def should_register(self) -> bool:
return self._should_register
@property
def name(self) -> str:
return self.template.id.name
@property
def resource_type(self) -> _identifier_model.ResourceType:
return _identifier_model.ResourceType.TASK
@property
def entity_type_text(self) -> str:
return "Task"
[docs]
class FlyteTaskNode(_workflow_model.TaskNode):
"""A class encapsulating a task that a Flyte node needs to execute."""
def __init__(self, flyte_task: FlyteTask):
super(FlyteTaskNode, self).__init__(None)
self._flyte_task = flyte_task
@property
def reference_id(self) -> id_models.Identifier:
"""A globally unique identifier for the task."""
return self._flyte_task.id
@property
def flyte_task(self) -> FlyteTask:
return self._flyte_task
[docs]
class FlyteWorkflowNode(_workflow_model.WorkflowNode):
"""A class encapsulating a workflow that a Flyte node needs to execute."""
def __init__(
self,
flyte_workflow: FlyteWorkflow = None,
flyte_launch_plan: FlyteLaunchPlan = None,
):
if flyte_workflow and flyte_launch_plan:
raise _system_exceptions.FlyteSystemException(
"FlyteWorkflowNode cannot be called with both a workflow and a launchplan specified, please pick "
f"one. workflow: {flyte_workflow} launchPlan: {flyte_launch_plan}",
)
self._flyte_workflow = flyte_workflow
self._flyte_launch_plan = flyte_launch_plan
super(FlyteWorkflowNode, self).__init__(
launchplan_ref=self._flyte_launch_plan.id if self._flyte_launch_plan else None,
sub_workflow_ref=self._flyte_workflow.id if self._flyte_workflow else None,
)
def __repr__(self) -> str:
if self.flyte_workflow is not None:
return f"FlyteWorkflowNode with workflow: {self.flyte_workflow}"
return f"FlyteWorkflowNode with launch plan: {self.flyte_launch_plan}"
@property
def launchplan_ref(self) -> id_models.Identifier:
"""A globally unique identifier for the launch plan, which should map to Admin."""
return self._flyte_launch_plan.id if self._flyte_launch_plan else None
@property
def sub_workflow_ref(self):
return self._flyte_workflow.id if self._flyte_workflow else None
@property
def flyte_launch_plan(self) -> FlyteLaunchPlan:
return self._flyte_launch_plan
@property
def flyte_workflow(self) -> FlyteWorkflow:
return self._flyte_workflow
@classmethod
def _promote_workflow(
cls,
wf: _workflow_models.WorkflowTemplate,
sub_workflows: Optional[Dict[Identifier, _workflow_models.WorkflowTemplate]] = None,
tasks: Optional[Dict[Identifier, FlyteTask]] = None,
node_launch_plans: Optional[Dict[Identifier, launch_plan_models.LaunchPlanSpec]] = None,
) -> FlyteWorkflow:
return FlyteWorkflow.promote_from_model(
wf,
sub_workflows=sub_workflows,
node_launch_plans=node_launch_plans,
tasks=tasks,
)
class FlyteBranchNode(_workflow_model.BranchNode):
def __init__(self, if_else: _workflow_model.IfElseBlock):
super().__init__(if_else)
@classmethod
def promote_from_model(
cls,
base_model: _workflow_model.BranchNode,
sub_workflows: Dict[id_models.Identifier, _workflow_model.WorkflowTemplate],
node_launch_plans: Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec],
tasks: Dict[id_models.Identifier, FlyteTask],
converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow],
) -> Tuple[FlyteBranchNode, Dict[id_models.Identifier, FlyteWorkflow]]:
block = base_model.if_else
block.case._then_node, converted_sub_workflows = FlyteNode.promote_from_model(
block.case.then_node,
sub_workflows,
node_launch_plans,
tasks,
converted_sub_workflows,
)
for o in block.other:
o._then_node, converted_sub_workflows = FlyteNode.promote_from_model(
o.then_node, sub_workflows, node_launch_plans, tasks, converted_sub_workflows
)
else_node = None
if block.else_node:
else_node, converted_sub_workflows = FlyteNode.promote_from_model(
block.else_node, sub_workflows, node_launch_plans, tasks, converted_sub_workflows
)
new_if_else_block = _workflow_model.IfElseBlock(block.case, block.other, else_node, block.error)
return cls(new_if_else_block), converted_sub_workflows
class FlyteGateNode(_workflow_model.GateNode):
@classmethod
def promote_from_model(cls, model: _workflow_model.GateNode):
return cls(model.signal, model.sleep, model.approve)
class FlyteArrayNode(_workflow_model.ArrayNode):
@classmethod
def promote_from_model(cls, model: _workflow_model.ArrayNode):
return cls(model._parallelism, model._node, model._min_success_ratio, model._min_successes)
[docs]
class FlyteNode(_hash_mixin.HashOnReferenceMixin, _workflow_model.Node):
"""A class encapsulating a remote Flyte node."""
def __init__(
self,
id,
upstream_nodes,
bindings,
metadata,
task_node: Optional[FlyteTaskNode] = None,
workflow_node: Optional[FlyteWorkflowNode] = None,
branch_node: Optional[FlyteBranchNode] = None,
gate_node: Optional[FlyteGateNode] = None,
array_node: Optional[FlyteArrayNode] = None,
):
if not task_node and not workflow_node and not branch_node and not gate_node and not array_node:
raise _user_exceptions.FlyteAssertion(
"An Flyte node must have one of task|workflow|branch|gate|array entity specified at once"
)
# TODO: Revisit flyte_branch_node and flyte_gate_node, should they be another type like Condition instead
# of a node?
self._flyte_task_node = task_node
if task_node:
self._flyte_entity = task_node.flyte_task
elif workflow_node:
self._flyte_entity = workflow_node.flyte_workflow or workflow_node.flyte_launch_plan
else:
self._flyte_entity = branch_node or gate_node or array_node
super(FlyteNode, self).__init__(
id=id,
metadata=metadata,
inputs=bindings,
upstream_node_ids=[n.id for n in upstream_nodes],
output_aliases=[],
task_node=task_node,
workflow_node=workflow_node,
branch_node=branch_node,
gate_node=gate_node,
array_node=array_node,
)
self._upstream = upstream_nodes
@property
def task_node(self) -> Optional[FlyteTaskNode]:
return self._flyte_task_node
@property
def flyte_entity(self) -> Union[FlyteTask, FlyteWorkflow, FlyteLaunchPlan, FlyteBranchNode]:
return self._flyte_entity
@classmethod
def _promote_task_node(cls, t: FlyteTask) -> FlyteTaskNode:
return FlyteTaskNode.promote_from_model(t)
@classmethod
def _promote_workflow_node(
cls,
wn: _workflow_model.WorkflowNode,
sub_workflows: Dict[id_models.Identifier, _workflow_model.WorkflowTemplate],
node_launch_plans: Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec],
tasks: Dict[Identifier, FlyteTask],
converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow],
) -> Tuple[FlyteWorkflowNode, Dict[id_models.Identifier, FlyteWorkflow]]:
return FlyteWorkflowNode.promote_from_model(
wn,
sub_workflows,
node_launch_plans,
tasks,
converted_sub_workflows,
)
@property
def upstream_nodes(self) -> List[FlyteNode]:
return self._upstream
@property
def upstream_node_ids(self) -> List[str]:
return list(sorted(n.id for n in self.upstream_nodes))
def __repr__(self) -> str:
return f"Node(ID: {self.id})"
[docs]
class FlyteWorkflow(_hash_mixin.HashOnReferenceMixin, RemoteEntity, WorkflowSpec):
"""A class encapsulating a remote Flyte workflow."""
def __init__(
self,
id: id_models.Identifier,
nodes: List[FlyteNode],
interface,
output_bindings,
metadata,
metadata_defaults,
subworkflows: Optional[List[FlyteWorkflow]] = None,
tasks: Optional[List[FlyteTask]] = None,
launch_plans: Optional[Dict[id_models.Identifier, launch_plan_models.LaunchPlanSpec]] = None,
compiled_closure: Optional[compiler_models.CompiledWorkflowClosure] = None,
should_register: bool = False,
):
# TODO: Remove check
for node in nodes:
for upstream in node.upstream_nodes:
if upstream.id is None:
raise _user_exceptions.FlyteAssertion(
"Some nodes contained in the workflow were not found in the workflow description. Please "
"ensure all nodes are either assigned to attributes within the class or an element in a "
"list, dict, or tuple which is stored as an attribute in the class."
)
self._flyte_sub_workflows = subworkflows
template_subworkflows = []
if subworkflows:
template_subworkflows = [swf.template for swf in subworkflows]
super(FlyteWorkflow, self).__init__(
template=_workflow_models.WorkflowTemplate(
id=id,
metadata=metadata,
metadata_defaults=metadata_defaults,
interface=interface,
nodes=nodes,
outputs=output_bindings,
),
sub_workflows=template_subworkflows,
)
self._flyte_nodes = nodes
# Optional things that we save for ease of access when promoting from a model or CompiledWorkflowClosure
self._tasks = tasks
self._launch_plans = launch_plans
self._compiled_closure = compiled_closure
self._node_map = None
self._name = id.name
self._should_register = should_register
@property
def name(self) -> str:
return self._name
@property
def flyte_tasks(self) -> Optional[List[FlyteTask]]:
return self._tasks
@property
def should_register(self) -> bool:
return self._should_register
@property
def flyte_sub_workflows(self) -> List[FlyteWorkflow]:
return self._flyte_sub_workflows
@property
def entity_type_text(self) -> str:
return "Workflow"
@property
def resource_type(self):
return id_models.ResourceType.WORKFLOW
@property
def flyte_nodes(self) -> List[FlyteNode]:
return self._flyte_nodes
@property
def id(self) -> Identifier:
"""
This is an autogenerated id by the system. The id is globally unique across Flyte.
"""
return self.template.id
@property
def metadata(self) -> WorkflowMetadata:
"""
This contains information on how to run the workflow.
"""
return self.template.metadata
@property
def metadata_defaults(self) -> WorkflowMetadataDefaults:
"""
This contains information on how to run the workflow.
:rtype: WorkflowMetadataDefaults
"""
return self.template.metadata_defaults
@property
def interface(self) -> TypedInterface:
"""
Defines a strongly typed interface for the Workflow (inputs, outputs). This can include some optional
parameters.
"""
return self.template.interface
@property
def nodes(self) -> List[Node]:
"""
A list of nodes. In addition, "globals" is a special reserved node id that can be used to consume
workflow inputs
"""
return self.template.nodes
@property
def outputs(self) -> List[Binding]:
"""
A list of output bindings that specify how to construct workflow outputs. Bindings can
pull node outputs or specify literals. All workflow outputs specified in the interface field must be bound
in order for the workflow to be validated. A workflow has an implicit dependency on all of its nodes
to execute successfully in order to bind final outputs.
"""
return self.template.outputs
@property
def failure_node(self) -> Node:
"""
Node failure_node: A catch-all node. This node is executed whenever the execution engine determines the
workflow has failed. The interface of this node must match the Workflow interface with an additional input
named "error" of type pb.lyft.flyte.core.Error.
"""
return self.template.failure_node
[docs]
@classmethod
def get_non_system_nodes(cls, nodes: List[_workflow_models.Node]) -> List[_workflow_models.Node]:
return [n for n in nodes if n.id not in {_constants.START_NODE_ID, _constants.END_NODE_ID}]
@classmethod
def _promote_node(
cls,
model: _workflow_model.Node,
sub_workflows: Optional[Dict[id_models.Identifier, _workflow_model.WorkflowTemplate]],
node_launch_plans: Optional[Dict[id_models.Identifier, _launch_plan_model.LaunchPlanSpec]],
tasks: Dict[id_models.Identifier, FlyteTask],
converted_sub_workflows: Dict[id_models.Identifier, FlyteWorkflow],
) -> Tuple[Optional[FlyteNode], Dict[id_models.Identifier, FlyteWorkflow]]:
return FlyteNode.promote_from_model(model, sub_workflows, node_launch_plans, tasks, converted_sub_workflows)
@classmethod
def _promote_task(cls, t: _task_models.TaskTemplate) -> FlyteTask:
return FlyteTask.promote_from_model(t)
[docs]
class FlyteLaunchPlan(hash_mixin.HashOnReferenceMixin, RemoteEntity, _launch_plan_models.LaunchPlanSpec):
"""A class encapsulating a remote Flyte launch plan."""
def __init__(self, id, *args, **kwargs):
super(FlyteLaunchPlan, self).__init__(*args, **kwargs)
# Set all the attributes we expect this class to have
self._id = id
self._name = id.name
# The interface is not set explicitly unless fetched in an engine context
self._interface = None
# If fetched when creating this object, can store it here.
self._flyte_workflow = None
@property
def name(self) -> str:
return self._name
@property
def flyte_workflow(self) -> Optional[FlyteWorkflow]:
return self._flyte_workflow
@property
def id(self) -> id_models.Identifier:
return self._id
@property
def is_scheduled(self) -> bool:
if self.entity_metadata.schedule.cron_expression:
return True
elif self.entity_metadata.schedule.rate and self.entity_metadata.schedule.rate.value:
return True
elif self.entity_metadata.schedule.cron_schedule and self.entity_metadata.schedule.cron_schedule.schedule:
return True
else:
return False
@property
def workflow_id(self) -> id_models.Identifier:
return self._workflow_id
@property
def interface(self) -> Optional[_interface.TypedInterface]:
"""
The interface is not technically part of the admin.LaunchPlanSpec in the IDL, however the workflow ID is, and
from the workflow ID, fetch will fill in the interface. This is nice because then you can __call__ the=
object and get a node.
"""
return self._interface
@property
def resource_type(self) -> id_models.ResourceType:
return id_models.ResourceType.LAUNCH_PLAN
@property
def entity_type_text(self) -> str:
return "Launch Plan"
[docs]
def compile(self, ctx: FlyteContext, *args, **kwargs):
fixed_input_lits = self.fixed_inputs.literals or {}
default_input_params = self.default_inputs.parameters or {}
return create_and_link_node_from_remote(
ctx,
entity=self,
_inputs_not_allowed=set(fixed_input_lits.keys()),
_ignorable_inputs=set(default_input_params.keys()),
**kwargs,
) # noqa
def __repr__(self) -> str:
return f"FlyteLaunchPlan(ID: {self.id} Interface: {self.interface}) - Spec {super().__repr__()})"