Typed Columns in a SchemaΒΆ

This example explains how a typed schema can be used in Flyte and declared in flytekit.

import pandas
from flytekit import kwtypes, task, workflow

Flytekit consists of some pre-built type extenstions, one of them is the FlyteSchema type

from flytekit.types.schema import FlyteSchema

FlyteSchema is an abstract Schema type that can be used to represent any structured dataset which has typed (or untyped) columns

out_schema = FlyteSchema[kwtypes(x=int, y=str)]

To write to a schema object refer to flytekit.types.schema.FlyteSchema.open() method. Writing can be done using any of the supported dataframe formats.

@task
def t1() -> out_schema:
    w = out_schema()
    df = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})
    w.open().write(df)
    return w

To read a Schema, one has to invoke the flytekit.types.schema.FlyteSchema.open(). The default mode is automatically configured to be open and the default returned dataframe type is pandas.DataFrame Different types of dataframes can be returned based on the type passed into the open method

@task
def t2(schema: FlyteSchema[kwtypes(x=int, y=str)]) -> FlyteSchema[kwtypes(x=int)]:
    assert isinstance(schema, FlyteSchema)
    df: pandas.DataFrame = schema.open().all()
    return df[schema.column_names()[:-1]]


@workflow
def wf() -> FlyteSchema[kwtypes(x=int)]:
    return t2(schema=t1())

Local execution will convert the data to and from the serialized representation thus, mimicing a complete distributed execution.

if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(f"Running wf(), returns columns {wf().columns()}")

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery