Note
Go to the end to download the full example code
Typed Columns in a Schema#
This example explains how a typed schema can be used in Flyte and declared in flytekit.
Flytekit consists of some pre-built type extensions, one of them is the FlyteSchema type
from flytekit.types.schema import FlyteSchema
FlyteSchema is an abstract Schema type that can be used to represent any structured dataset which has typed (or untyped) columns
out_schema = FlyteSchema[kwtypes(x=int, y=str)]
To write to a schema object refer to FlyteSchema.open
method. Writing can be done
using any of the supported dataframe formats.
@task
def t1() -> out_schema:
w = out_schema()
df = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})
w.open().write(df)
return w
To read a Schema, one has to invoke the FlyteSchema.open
. The default mode
is automatically configured to be open and the default returned dataframe type is pandas.DataFrame
Different types of dataframes can be returned based on the type passed into the open method
@task
def t2(schema: FlyteSchema[kwtypes(x=int, y=str)]) -> FlyteSchema[kwtypes(x=int)]:
assert isinstance(schema, FlyteSchema)
df: pandas.DataFrame = schema.open().all()
return df[schema.column_names()[:-1]]
@workflow
def wf() -> FlyteSchema[kwtypes(x=int)]:
return t2(schema=t1())
Local execution will convert the data to and from the serialized representation thus, mimicking a complete distributed execution.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running wf(), returns columns {wf().columns()}")
Total running time of the script: ( 0 minutes 0.000 seconds)