Pydantic BaseModel#

Tags: Basic

flytekit version >=1.14 supports natively the JSON format that Pydantic BaseModel produces, enhancing the interoperability of Pydantic BaseModels with the Flyte type system.

Important

Pydantic BaseModel V2 only works when you are using flytekit version >= v1.14.0.

With the 1.14 release, flytekit adopted MessagePack as the serialization format for Pydantic BaseModel, overcoming a major limitation of serialization into a JSON string within a Protobuf struct datatype like the previous versions do:

to store int types, Protobuf’s struct converts them to float, forcing users to write boilerplate code to work around this issue.

Important

By default, flytekit >= 1.14 will produce msgpack bytes literals when serializing, preserving the types defined in your BaseModel class. If you’re serializing BaseModel using flytekit version >= v1.14.0 and you want to produce Protobuf struct literal instead, you can set environment variable FLYTE_USE_OLD_DC_FORMAT to true.

For more details, you can refer the MESSAGEPACK IDL RFC: https://github.com/flyteorg/flyte/blob/master/rfc/system/5741-binary-idl-with-message-pack.md

Note

You can put Dataclass and FlyteTypes (FlyteFile, FlyteDirectory, FlyteSchema, and StructuredDataset) in a pydantic BaseModel.

Note

To clone and run the example code on this page, see the Flytesnacks repo.

To begin, import the necessary dependencies:

data_types_and_io/pydantic_basemodel.py#
import os
import tempfile

import pandas as pd
from flytekit import ImageSpec, task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
from pydantic import BaseModel

Build your custom image with ImageSpec:

data_types_and_io/pydantic_basemodel.py#
image_spec = ImageSpec(
    registry="ghcr.io/flyteorg",
    packages=["pandas", "pyarrow", "pydantic"],
)

Python types#

We define a pydantic basemodel with int, str and dict as the data types.

data_types_and_io/pydantic_basemodel.py#
class Datum(BaseModel):
    x: int
    y: str
    z: dict[int, str]

You can send a pydantic basemodel between different tasks written in various languages, and input it through the Flyte console as raw JSON.

Note

All variables in a data class should be annotated with their type. Failure to do should will result in an error.

Once declared, a dataclass can be returned as an output or accepted as an input.

data_types_and_io/pydantic_basemodel.py#
@task(container_image=image_spec)
def stringify(s: int) -> Datum:
    """
    A Pydantic model return will be treated as a single complex JSON return.
    """
    return Datum(x=s, y=str(s), z={s: str(s)})


@task(container_image=image_spec)
def add(x: Datum, y: Datum) -> Datum:
    """
    Flytekit automatically converts the provided JSON into a Pydantic model.
    If the structures don't match, it triggers a runtime failure.
    """
    x.z.update(y.z)
    return Datum(x=x.x + y.x, y=x.y + y.y, z=x.z)

Flyte types#

We also define a data class that accepts StructuredDataset, FlyteFile and FlyteDirectory.

data_types_and_io/pydantic_basemodel.py#
class FlyteTypes(BaseModel):
    dataframe: StructuredDataset
    file: FlyteFile
    directory: FlyteDirectory


@task(container_image=image_spec)
def upload_data() -> FlyteTypes:
    """
    Flytekit will upload FlyteFile, FlyteDirectory, and StructuredDataset to the blob store,
    such as GCP or S3.
    """
    # 1. StructuredDataset
    df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})

    # 2. FlyteDirectory
    temp_dir = tempfile.mkdtemp(prefix="flyte-")
    df.to_parquet(os.path.join(temp_dir, "df.parquet"))

    # 3. FlyteFile
    file_path = tempfile.NamedTemporaryFile(delete=False)
    file_path.write(b"Hello, World!")
    file_path.close()

    fs = FlyteTypes(
        dataframe=StructuredDataset(dataframe=df),
        file=FlyteFile(file_path.name),
        directory=FlyteDirectory(temp_dir),
    )
    return fs


@task(container_image=image_spec)
def download_data(res: FlyteTypes):
    expected_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
    actual_df = res.dataframe.open(pd.DataFrame).all()
    assert expected_df.equals(actual_df), "DataFrames do not match!"

    with open(res.file, "r") as f:
        assert f.read() == "Hello, World!", "File contents do not match!"

    assert os.listdir(res.directory) == ["df.parquet"], "Directory contents do not match!"

A data class supports the usage of data associated with Python types, data classes, flyte file, flyte directory and structured dataset.

We define a workflow that calls the tasks created above.

data_types_and_io/pydantic_basemodel.py#
@workflow
def basemodel_wf(x: int, y: int) -> (Datum, FlyteTypes):
    o1 = add(x=stringify(s=x), y=stringify(s=y))
    o2 = upload_data()
    download_data(res=o2)
    return o1, o2

You can run the workflow locally as follows:

data_types_and_io/pydantic_basemodel.py#
if __name__ == "__main__":
    basemodel_wf(x=10, y=20)

To trigger a task that accepts a dataclass as an input with pyflyte run, you can provide a JSON file as an input:

pyflyte run \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/b71e01d45037cea883883f33d8d93f258b9a5023/examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py \
  basemodel_wf --x 1 --y 2