Typed Columns in a SchemaΒΆ

This example explains how a typed schema can be used in Flyte and declared in flytekit.

import pandas
from flytekit import kwtypes, task, workflow

Flytekit consists of some pre-built type extensions, one of them is the FlyteSchema type

from flytekit.types.schema import FlyteSchema

FlyteSchema is an abstract Schema type that can be used to represent any structured dataset which has typed (or untyped) columns

out_schema = FlyteSchema[kwtypes(x=int, y=str)]

To write to a schema object refer to flytekit.types.schema.FlyteSchema.open() method. Writing can be done using any of the supported dataframe formats.

def t1() -> out_schema:
    w = out_schema()
    df = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})
    return w

To read a Schema, one has to invoke the flytekit.types.schema.FlyteSchema.open(). The default mode is automatically configured to be open and the default returned dataframe type is pandas.DataFrame Different types of dataframes can be returned based on the type passed into the open method

def t2(schema: FlyteSchema[kwtypes(x=int, y=str)]) -> FlyteSchema[kwtypes(x=int)]:
    assert isinstance(schema, FlyteSchema)
    df: pandas.DataFrame = schema.open().all()
    return df[schema.column_names()[:-1]]

def wf() -> FlyteSchema[kwtypes(x=int)]:
    return t2(schema=t1())

Local execution will convert the data to and from the serialized representation thus, mimicking a complete distributed execution.

if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(f"Running wf(), returns columns {wf().columns()}")

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery