Once you have a hang of the fundamentals of Flyte, you may find that you want to use it in ways that aren’t supported out-of-the-box. Fortunately, Flyte provides multiple extension points that enable you make it more powerful for your specific use cases.
This guide will walk you through two of the many different ways you can extend the Flyte type system and Flyte tasks.
Customizing Flyte Types#
Flyte has a rich type system that automatically handles the serialization and deserialization of objects so that when you pass data from one task to the next, you don’t have to write a bunch of boilerplate code.
However, the types that ship with Flyte or one of Flyte’s first-party integrations may not fulfill your needs. In this case, you’ll need to create your own.
The easiest way to do with is with the
dataclasses-json modules, which
let you compose several Flyte-supported types into a single object. For
example, suppose you want to support a coordinates data type with arbitrary
import typing from dataclasses import dataclass from dataclasses_json import dataclass_json @dataclass_json @dataclass class Coordinate: """A custom type for coordinates with metadata attached.""" x: float y: float metadata: typing.Dict[str, float]
You can then use this as a new type in your tasks and workflows:
from flytekit import task @task def generate_coordinates(num: int) -> typing.List[Coordinate]: """Generate some coordinates.""" ... @task def subset_coordinates( coordinates: typing.List[Coordinate], x_min: float, x_max: float, ) -> typing.List[Coordinate]: """Select coordinates within a certain x-axis range.""" ...
The limitation of using the approach above is that you can only compose types that are already supported by Flyte.
To create entirely new types, you’ll need to use the
TypeTransformer interface to explicitly handle
the way in which the object is (a) serialized as a task output and (b)
deserialized when passed into a task as an input.
See the User Guide for an example of a custom type.
Customizing Flyte Tasks#
The easiest way to extend Flyte tasks is to use Python decorators. Since Flyte
tasks are simply functions, you can wrap the task function in a custom
decorator before wrapping the entire function in the
For example, if we want to do something before and after the actual task function is invoked, we can do the following:
from functools import partial, wraps def decorator(fn): @wraps(fn) def wrapper(*args, **kwargs): print("do something before") out = fn(*args, **kwargs) print("do something after") return out return wrapper
Then, making sure
@task is the outermost decorator, we can modify the
behavior of the task:
@task @decorator def add_one(x: int) -> int: return x + 1 add_one(x=10)
do something before do something after
This approach allows you to call out to some external service or library before
and after your task function body is executed. For example, this pattern is used
by the MLFlow integration via the
decorator to auto-log metrics during a model-training task.
The Plugin Hierarchy of Needs#
The decorator approach is great for many surface-level use cases, but there are many more ways to customize Flyte tasks:
Task extensions that use pre-built containers, useful for tasks that don’t require user-defined code and simply rely on input parameters.
Task extensions that require user-built containers when the task also requires user-defined code.
These tasks can be implemented in other programming languages like R,
Julia, etc. Useful for leveraging highly optimized domain-specific libraries
in other languages outside of the
These tasks plugins require implementing a backend plugin to leverage external services like Sagemaker, Snowflake, BigQuery, etc.
Congratulations! 🎉 You’ve just completed the Flyte Fundamentals tour.
The final section in the getting started section of the docs will provide you with some core use cases for implementing your first workflows, whether you’re a data scientist, data analyst, data engineer, or machine learning engineer.