Pickle type

Tags: Basic

Flyte enforces type safety by utilizing type information for compiling tasks and workflows, enabling various features such as static analysis and conditional branching.

However, we also strive to offer flexibility to end-users so they don’t have to invest heavily in understanding their data structures upfront before experiencing the value Flyte has to offer.

Flyte supports the FlytePickle transformer, which converts any unrecognized type hint into FlytePickle, enabling the serialization/deserialization of Python values to/from a pickle file.

Important

Pickle can only be used to send objects between the exact same Python version. For optimal performance, it’s advisable to either employ Python types that are supported by Flyte or register a custom transformer, as using pickle types can result in lower performance.

This example demonstrates how you can utilize custom objects without registering a transformer.

Note

To clone and run the example code on this page, see the Flytesnacks repo.

data_types_and_io/pickle_type.py
from flytekit import task, workflow

Superhero represents a user-defined complex type that can be serialized to a pickle file by Flytekit and transferred between tasks as both input and output data.

Note

Alternatively, you can turn this object into a dataclass for improved performance. We have used a simple object here for demonstration purposes.

data_types_and_io/pickle_type.py
class Superhero:
    def __init__(self, name, power):
        self.name = name
        self.power = power


@task
def welcome_superhero(name: str, power: str) -> Superhero:
    return Superhero(name, power)


@task
def greet_superhero(superhero: Superhero) -> str:
    return f"👋 Hello {superhero.name}! Your superpower is {superhero.power}."


@workflow
def superhero_wf(name: str = "Thor", power: str = "Flight") -> str:
    superhero = welcome_superhero(name=name, power=power)
    return greet_superhero(superhero=superhero)

Batch size

By default, if the list subtype is unrecognized, a single pickle file is generated. To optimize serialization and deserialization performance for scenarios involving a large number of items or significant list elements, you can specify a batch size. This feature allows for the processing of each batch as a separate pickle file. The following example demonstrates how to set the batch size.

data_types_and_io/pickle_type.py
from typing import Iterator

from flytekit.types.pickle.pickle import BatchSize
from typing_extensions import Annotated


@task
def welcome_superheroes(names: list[str], powers: list[str]) -> Annotated[list[Superhero], BatchSize(3)]:
    return [Superhero(name, power) for name, power in zip(names, powers)]


@task
def greet_superheroes(superheroes: list[Superhero]) -> Iterator[str]:
    for superhero in superheroes:
        yield f"👋 Hello {superhero.name}! Your superpower is {superhero.power}."


@workflow
def superheroes_wf(
    names: list[str] = ["Thor", "Spiderman", "Hulk"],
    powers: list[str] = ["Flight", "Surface clinger", "Shapeshifting"],
) -> Iterator[str]:
    superheroes = welcome_superheroes(names=names, powers=powers)
    return greet_superheroes(superheroes=superheroes)

Note

The welcome_superheroes task will generate two pickle files: one containing two superheroes and the other containing one superhero.

You can run the workflows locally as follows:

data_types_and_io/pickle_type.py
if __name__ == "__main__":
    print(f"Superhero wf: {superhero_wf()}")
    print(f"Superhero(es) wf: {superheroes_wf()}")