from dataclasses import dataclass
from typing import List, Optional, Union
from mashumaro.mixins.json import DataClassJSONMixin
from flytekit.models import task as task_models
[docs]
@dataclass
class Resources(DataClassJSONMixin):
"""
This class is used to specify both resource requests and resource limits.
.. code-block:: python
Resources(cpu="1", mem="2048") # This is 1 CPU and 2 KB of memory
Resources(cpu="100m", mem="2Gi") # This is 1/10th of a CPU and 2 gigabytes of memory
Resources(cpu=0.5, mem=1024) # This is 500m CPU and 1 KB of memory
# For Kubernetes-based tasks, pods use ephemeral local storage for scratch space, caching, and for logs.
# This allocates 1Gi of such local storage.
Resources(ephemeral_storage="1Gi")
.. note::
Persistent storage is not currently supported on the Flyte backend.
Please see the :std:ref:`User Guide <cookbook:customizing task resources>` for detailed examples.
Also refer to the `K8s conventions. <https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes>`__
"""
cpu: Optional[Union[str, int, float]] = None
mem: Optional[Union[str, int]] = None
gpu: Optional[Union[str, int]] = None
ephemeral_storage: Optional[Union[str, int]] = None
def __post_init__(self):
def _check_cpu(value):
if value is None:
return
if not isinstance(value, (str, int, float)):
raise AssertionError(f"{value} should be of type str or int or float")
def _check_others(value):
if value is None:
return
if not isinstance(value, (str, int)):
raise AssertionError(f"{value} should be of type str or int")
_check_cpu(self.cpu)
_check_others(self.mem)
_check_others(self.gpu)
_check_others(self.ephemeral_storage)
@dataclass
class ResourceSpec(DataClassJSONMixin):
requests: Resources
limits: Resources
_ResourceName = task_models.Resources.ResourceName
_ResourceEntry = task_models.Resources.ResourceEntry
def _convert_resources_to_resource_entries(resources: Resources) -> List[_ResourceEntry]: # type: ignore
resource_entries = []
if resources.cpu is not None:
resource_entries.append(_ResourceEntry(name=_ResourceName.CPU, value=str(resources.cpu)))
if resources.mem is not None:
resource_entries.append(_ResourceEntry(name=_ResourceName.MEMORY, value=str(resources.mem)))
if resources.gpu is not None:
resource_entries.append(_ResourceEntry(name=_ResourceName.GPU, value=str(resources.gpu)))
if resources.ephemeral_storage is not None:
resource_entries.append(
_ResourceEntry(name=_ResourceName.EPHEMERAL_STORAGE, value=str(resources.ephemeral_storage))
)
return resource_entries
def convert_resources_to_resource_model(
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
) -> task_models.Resources:
"""
Convert flytekit ``Resources`` objects to a Resources model
:param requests: Resource requests. Optional, defaults to ``None``
:param limits: Resource limits. Optional, defaults to ``None``
:return: The given resources as requests and limits
"""
request_entries = []
limit_entries = []
if requests is not None:
request_entries = _convert_resources_to_resource_entries(requests)
if limits is not None:
limit_entries = _convert_resources_to_resource_entries(limits)
return task_models.Resources(requests=request_entries, limits=limit_entries)