Extending Flyte#
Once you have a hang of the fundamentals of Flyte, you may find that you want to use it in ways that aren’t supported out-of-the-box. Fortunately, Flyte provides multiple extension points that enable you make it more powerful for your specific use cases.
This guide will walk you through two of the many different ways you can extend the Flyte type system and Flyte tasks.
Customizing Flyte types#
Flyte has a rich type system that automatically handles the serialization and deserialization of objects so that when you pass data from one task to the next, you don’t have to write a bunch of boilerplate code.
However, the types that ship with Flyte or one of Flyte’s first-party integrations may not fulfill your needs. In this case, you’ll need to create your own.
The easiest way to do this is with the dataclasses
module, which
lets you compose several Flyte-supported types into a single object. For
example, suppose you want to support a coordinates data type with arbitrary
metadata:
import typing
from dataclasses import dataclass
from mashumaro.mixins.json import DataClassJSONMixin
@dataclass
class Coordinate(DataClassJSONMixin):
"""A custom type for coordinates with metadata attached."""
x: float
y: float
metadata: typing.Dict[str, float]
You can then use this as a new type in your tasks and workflows:
from flytekit import task
@task
def generate_coordinates(num: int) -> typing.List[Coordinate]:
"""Generate some coordinates."""
...
@task
def subset_coordinates(
coordinates: typing.List[Coordinate], x_min: float, x_max: float,
) -> typing.List[Coordinate]:
"""Select coordinates within a certain x-axis range."""
...
Important
The limitation of using the approach above is that you can only compose types that are already supported by Flyte.
To create entirely new types, you’ll need to use the
TypeTransformer
interface to explicitly handle
the way in which the object is (a) serialized as a task output and (b)
deserialized when passed into a task as an input.
See the User Guide for an example of a custom type.
Customizing Flyte tasks#
The easiest way to extend Flyte tasks is to use Python decorators. Since Flyte
tasks are simply functions, you can wrap the task function in a custom
decorator before wrapping the entire function in the @task
decorator.
For example, if we want to do something before and after the actual task function is invoked, we can do the following:
from functools import partial, wraps
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
print("do something before")
out = fn(*args, **kwargs)
print("do something after")
return out
return wrapper
Then, making sure @task
is the outermost decorator, we can modify the
behavior of the task:
@task
@decorator
def add_one(x: int) -> int:
return x + 1
add_one(x=10)
This approach allows you to call out to some external service or library before
and after your task function body is executed. For example, this pattern is used
by the MLFlow integration via the mlflow_autolog()
decorator to auto-log metrics during a model-training task.
Note
You can stack multiple decorators on top of each other. Learn more in the User Guide.
Flyte also supports a setup-teardown pattern at the workflow level, which allows you to enable/disable services at the beginning/end of your workflows. See the User Guide for more details.
The plugin hierarchy of needs#
The decorator approach is great for many surface-level use cases, but there are many more ways to customize Flyte tasks:
Task extensions that use pre-built containers, useful for tasks that don’t require user-defined code and simply rely on input parameters. |
|
Task extensions that require user-built containers when the task also requires user-defined code. |
|
These tasks can be implemented in other programming languages like R,
Julia, etc. Useful for leveraging highly optimized domain-specific libraries
in other languages outside of the |
|
These tasks plugins require implementing a backend plugin to leverage external services like SageMaker, Snowflake, BigQuery, etc. |