Data Class#

Tags: Basic

When you’ve multiple values that you want to send across Flyte entities, you can use a dataclass.

Flytekit uses the Mashumaro library to serialize and deserialize dataclasses.


If you’re using Flytekit version below v1.10, you’ll need to decorate with @dataclass_json using from dataclass_json import dataclass_json instead of inheriting from Mashumaro’s DataClassJSONMixin.

To begin, import the necessary dependencies.

import os
import tempfile
from dataclasses import dataclass

import pandas as pd
from flytekit import task, workflow
from import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
from mashumaro.mixins.json import DataClassJSONMixin

Python types#

We define a dataclass with int, str and dict as the data types.

class Datum(DataClassJSONMixin):
    x: int
    y: str
    z: dict[int, str]

You can send a dataclass between different tasks written in various languages, and input it through the Flyte console as raw JSON.


All variables in a data class should be annotated with their type. Failure to do should will result in an error.

Once declared, a dataclass can be returned as an output or accepted as an input.

def stringify(s: int) -> Datum:
    A dataclass return will be treated as a single complex JSON return.
    return Datum(x=s, y=str(s), z={s: str(s)})

def add(x: Datum, y: Datum) -> Datum:
    Flytekit automatically converts the provided JSON into a data class.
    If the structures don't match, it triggers a runtime failure.
    return Datum(x=x.x + y.x, y=x.y + y.y, z=x.z)

Flyte types#

We also define a data class that accepts StructuredDataset, FlyteFile and FlyteDirectory.

class FlyteTypes(DataClassJSONMixin):
    dataframe: StructuredDataset
    file: FlyteFile
    directory: FlyteDirectory

def upload_data() -> FlyteTypes:
    Flytekit will upload FlyteFile, FlyteDirectory and StructuredDataset to the blob store,
    such as GCP or S3.
    # 1. StructuredDataset
    df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})

    # 2. FlyteDirectory
    temp_dir = tempfile.mkdtemp(prefix="flyte-")
    df.to_parquet(temp_dir + "/df.parquet")

    # 3. FlyteFile
    file_path = tempfile.NamedTemporaryFile(delete=False)
    file_path.write(b"Hello, World!")

    fs = FlyteTypes(
    return fs

def download_data(res: FlyteTypes):
    assert pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}).equals(
    f = open(res.file, "r")
    assert == "Hello, World!"
    assert os.listdir( == ["df.parquet"]

A data class supports the usage of data associated with Python types, data classes, flyte file, flyte directory and structured dataset.

We define a workflow that calls the tasks created above.

def dataclass_wf(x: int, y: int) -> (Datum, FlyteTypes):
    o1 = add(x=stringify(s=x), y=stringify(s=y))
    o2 = upload_data()
    return o1, o2

You can run the workflow locally as follows:

if __name__ == "__main__":
    dataclass_wf(x=10, y=20)

To trigger a task that accepts a dataclass as an input with pyflyte run, you can provide a JSON file as an input:

pyflyte run \ \
  add --x dataclass_input.json --y dataclass_input.json