flytekit.types.structured.StructuredDatasetTransformerEngine#

class flytekit.types.structured.StructuredDatasetTransformerEngine[source]#

Think of this transformer as a higher-level meta transformer that is used for all the dataframe types. If you are bringing a custom data frame type, or any data frame type, to flytekit, instead of registering with the main type engine, you should register with this transformer instead.

Methods

assert_type(t, v)[source]#
Parameters
encode(ctx, sd, df_type, protocol, format, structured_literal_type)[source]#
Parameters
Return type

flytekit.models.literals.Literal

classmethod get_decoder(df_type, protocol, format)[source]#
Parameters
  • df_type (Type) –

  • protocol (str) –

  • format (str) –

classmethod get_encoder(df_type, protocol, format)[source]#
Parameters
  • df_type (Type) –

  • protocol (str) –

  • format (str) –

get_literal_type(t)[source]#

Provide a concrete implementation so that writers of custom dataframe handlers since there’s nothing that special about the literal type. Any dataframe type will always be associated with the structured dataset type. The other aspects of it - columns, external schema type, etc. can be read from associated metadata.

Parameters

t (Union[Type[flytekit.types.structured.structured_dataset.StructuredDataset], Any]) – The python dataframe type, which is mostly ignored.

Return type

flytekit.models.types.LiteralType

guess_python_type(literal_type)[source]#

Converts the Flyte LiteralType to a python object type.

Parameters

literal_type (flytekit.models.types.LiteralType) –

Return type

Type[flytekit.types.structured.structured_dataset.T]

iter_as(ctx, sd, df_type, updated_metadata)[source]#
Parameters
  • ctx (flytekit.core.context_manager.FlyteContext) –

  • sd (flytekit.models.literals.StructuredDataset) –

  • df_type (Type[flytekit.types.structured.structured_dataset.DF]) –

  • updated_metadata (flytekit.models.literals.StructuredDatasetMetadata) –

Return type

Generator[flytekit.types.structured.structured_dataset.DF, None, None]

open_as(ctx, sd, df_type, updated_metadata)[source]#
Parameters
  • ctx (flytekit.core.context_manager.FlyteContext) – A FlyteContext, useful in accessing the filesystem and other attributes

  • sd (flytekit.models.literals.StructuredDataset) –

  • df_type (Type[flytekit.types.structured.structured_dataset.DF]) –

  • updated_metadata (flytekit.models.literals.StructuredDatasetMetadata) – New metadata type, since it might be different from the metadata in the literal.

Returns

dataframe. It could be pandas dataframe or arrow table, etc.

Return type

flytekit.types.structured.structured_dataset.DF

classmethod register(h, default_for_type=True, override=False)[source]#

Call this with any handler to register it with this dataframe meta-transformer

The string “://” should not be present in any handler’s protocol so we don’t check for it.

Parameters
  • h (Handlers) –

  • default_for_type (Optional[bool]) –

  • override (Optional[bool]) –

to_html(ctx, python_val, expected_python_type)[source]#

Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div

Parameters
Return type

str

to_literal(ctx, python_val, python_type, expected)[source]#

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch :param ctx: A FlyteContext, useful in accessing the filesystem and other attributes :param python_val: The actual value to be transformed :param python_type: The assumed type of the value (this matches the declared type on the function) :param expected: Expected Literal Type

Parameters
Return type

flytekit.models.literals.Literal

to_python_value(ctx, lv, expected_python_type)[source]#

The only tricky thing with converting a Literal (say the output of an earlier task), to a Python value at the start of a task execution, is the column subsetting behavior. For example, if you have,

def t1() -> Annotated[StructuredDataset, kwtypes(col_a=int, col_b=float)]: … def t2(in_a: Annotated[StructuredDataset, kwtypes(col_b=float)]): …

where t2(in_a=t1()), when t2 does in_a.open(pd.DataFrame).all(), it should get a DataFrame with only one column.

StructuredDatasetType of the incoming Literal

StructuredDatasetType of currently running task

Has columns defined

[] columns or None

Has columns defined

The StructuredDatasetType passed to the decoder will have the columns as defined by the type annotation of the currently running task.

Decoders should then subset the incoming data to the columns requested.

[] columns or None

StructuredDatasetType passed to decoder will have the columns from the incoming Literal. This is the scenario where the Literal returned by the running task will have more information than the running task’s signature.

StructuredDatasetType passed to the decoder will have an empty list of columns.

Parameters
Return type

flytekit.types.structured.structured_dataset.T

Attributes

DECODERS: Dict[Type, Dict[str, Dict[str, flytekit.types.structured.structured_dataset.StructuredDatasetDecoder]]] = {<class 'pandas.core.frame.DataFrame'>: {'/': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToPandasDecodingHandler object>}, 's3': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToPandasDecodingHandler object>}, 'gs': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToPandasDecodingHandler object>}, 'bq': {'': <flytekit.types.structured.bigquery.BQToPandasDecodingHandler object>}}, <class 'pyarrow.lib.Table'>: {'/': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToArrowDecodingHandler object>}, 's3': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToArrowDecodingHandler object>}, 'gs': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToArrowDecodingHandler object>}, 'bq': {'': <flytekit.types.structured.bigquery.BQToArrowDecodingHandler object>}}, <class 'pyspark.sql.dataframe.DataFrame'>: {'/': {'parquet': <flytekitplugins.spark.sd_transformers.ParquetToSparkDecodingHandler object>}, 's3': {'parquet': <flytekitplugins.spark.sd_transformers.ParquetToSparkDecodingHandler object>}}}#
DEFAULT_FORMATS: Dict[Type, str] = {<class 'pandas.core.frame.DataFrame'>: 'parquet', <class 'pyarrow.lib.Table'>: 'parquet', <class 'pyspark.sql.dataframe.DataFrame'>: 'parquet'}#
DEFAULT_PROTOCOLS: Dict[Type, str] = {<class 'pandas.core.frame.DataFrame'>: 's3', <class 'pyarrow.lib.Table'>: 's3', <class 'pyspark.sql.dataframe.DataFrame'>: 's3'}#
ENCODERS: Dict[Type, Dict[str, Dict[str, flytekit.types.structured.structured_dataset.StructuredDatasetEncoder]]] = {<class 'pandas.core.frame.DataFrame'>: {'/': {'parquet': <flytekit.types.structured.basic_dfs.PandasToParquetEncodingHandler object>}, 's3': {'parquet': <flytekit.types.structured.basic_dfs.PandasToParquetEncodingHandler object>}, 'gs': {'parquet': <flytekit.types.structured.basic_dfs.PandasToParquetEncodingHandler object>}, 'bq': {'': <flytekit.types.structured.bigquery.PandasToBQEncodingHandlers object>}}, <class 'pyarrow.lib.Table'>: {'/': {'parquet': <flytekit.types.structured.basic_dfs.ArrowToParquetEncodingHandler object>}, 's3': {'parquet': <flytekit.types.structured.basic_dfs.ArrowToParquetEncodingHandler object>}, 'gs': {'parquet': <flytekit.types.structured.basic_dfs.ArrowToParquetEncodingHandler object>}, 'bq': {'': <flytekit.types.structured.bigquery.ArrowToBQEncodingHandlers object>}}, <class 'pyspark.sql.dataframe.DataFrame'>: {'/': {'parquet': <flytekitplugins.spark.sd_transformers.SparkToParquetEncodingHandler object>}, 's3': {'parquet': <flytekitplugins.spark.sd_transformers.SparkToParquetEncodingHandler object>}}}#
Handlers#

alias of Union[flytekit.types.structured.structured_dataset.StructuredDatasetEncoder, flytekit.types.structured.structured_dataset.StructuredDatasetDecoder]

alias of Union[flytekit.types.structured.structured_dataset.StructuredDatasetEncoder, flytekit.types.structured.structured_dataset.StructuredDatasetDecoder] .. autoattribute:: Handlers .. autoattribute:: hash_overridable .. autoattribute:: name .. autoattribute:: python_type .. autoattribute:: type_assertions_enabled