flytekit.types.structured.StructuredDatasetTransformerEngine#
- class flytekit.types.structured.StructuredDatasetTransformerEngine[source]#
Think of this transformer as a higher-level meta transformer that is used for all the dataframe types. If you are bringing a custom data frame type, or any data frame type, to flytekit, instead of registering with the main type engine, you should register with this transformer instead.
Methods
- encode(ctx, sd, df_type, protocol, format, structured_literal_type)[source]#
- Parameters
sd (flytekit.types.structured.structured_dataset.StructuredDataset) –
df_type (Type) –
protocol (str) –
format (str) –
structured_literal_type (flytekit.models.types.StructuredDatasetType) –
- Return type
- get_literal_type(t)[source]#
Provide a concrete implementation so that writers of custom dataframe handlers since there’s nothing that special about the literal type. Any dataframe type will always be associated with the structured dataset type. The other aspects of it - columns, external schema type, etc. can be read from associated metadata.
- Parameters
t (Union[Type[flytekit.types.structured.structured_dataset.StructuredDataset], Any]) – The python dataframe type, which is mostly ignored.
- Return type
- guess_python_type(literal_type)[source]#
Converts the Flyte LiteralType to a python object type.
- Parameters
literal_type (flytekit.models.types.LiteralType) –
- Return type
Type[flytekit.types.structured.structured_dataset.T]
- iter_as(ctx, sd, df_type, updated_metadata)[source]#
- open_as(ctx, sd, df_type, updated_metadata)[source]#
- Parameters
ctx (flytekit.core.context_manager.FlyteContext) – A FlyteContext, useful in accessing the filesystem and other attributes
sd (flytekit.models.literals.StructuredDataset) –
df_type (Type[flytekit.types.structured.structured_dataset.DF]) –
updated_metadata (flytekit.models.literals.StructuredDatasetMetadata) – New metadata type, since it might be different from the metadata in the literal.
- Returns
dataframe. It could be pandas dataframe or arrow table, etc.
- Return type
flytekit.types.structured.structured_dataset.DF
- classmethod register(h, default_for_type=True, override=False)[source]#
Call this with any handler to register it with this dataframe meta-transformer
The string “://” should not be present in any handler’s protocol so we don’t check for it.
- to_html(ctx, python_val, expected_python_type)[source]#
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
- to_literal(ctx, python_val, python_type, expected)[source]#
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch :param ctx: A FlyteContext, useful in accessing the filesystem and other attributes :param python_val: The actual value to be transformed :param python_type: The assumed type of the value (this matches the declared type on the function) :param expected: Expected Literal Type
- Parameters
python_val (Union[flytekit.types.structured.structured_dataset.StructuredDataset, Any]) –
python_type (Union[Type[flytekit.types.structured.structured_dataset.StructuredDataset], Type]) –
expected (flytekit.models.types.LiteralType) –
- Return type
- to_python_value(ctx, lv, expected_python_type)[source]#
The only tricky thing with converting a Literal (say the output of an earlier task), to a Python value at the start of a task execution, is the column subsetting behavior. For example, if you have,
def t1() -> Annotated[StructuredDataset, kwtypes(col_a=int, col_b=float)]: … def t2(in_a: Annotated[StructuredDataset, kwtypes(col_b=float)]): …
where t2(in_a=t1()), when t2 does in_a.open(pd.DataFrame).all(), it should get a DataFrame with only one column.
StructuredDatasetType of the incoming Literal
StructuredDatasetType of currently running task
Has columns defined
[] columns or None
Has columns defined
The StructuredDatasetType passed to the decoder will have the columns as defined by the type annotation of the currently running task.
Decoders should then subset the incoming data to the columns requested.
[] columns or None
StructuredDatasetType passed to decoder will have the columns from the incoming Literal. This is the scenario where the Literal returned by the running task will have more information than the running task’s signature.
StructuredDatasetType passed to the decoder will have an empty list of columns.
- Parameters
expected_python_type (Type[flytekit.types.structured.structured_dataset.T]) –
- Return type
flytekit.types.structured.structured_dataset.T
Attributes
- DECODERS: Dict[Type, Dict[str, Dict[str, flytekit.types.structured.structured_dataset.StructuredDatasetDecoder]]] = {<class 'pandas.core.frame.DataFrame'>: {'/': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToPandasDecodingHandler object>}, 's3': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToPandasDecodingHandler object>}, 'gs': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToPandasDecodingHandler object>}, 'bq': {'': <flytekit.types.structured.bigquery.BQToPandasDecodingHandler object>}}, <class 'pyarrow.lib.Table'>: {'/': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToArrowDecodingHandler object>}, 's3': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToArrowDecodingHandler object>}, 'gs': {'parquet': <flytekit.types.structured.basic_dfs.ParquetToArrowDecodingHandler object>}, 'bq': {'': <flytekit.types.structured.bigquery.BQToArrowDecodingHandler object>}}, <class 'pyspark.sql.dataframe.DataFrame'>: {'/': {'parquet': <flytekitplugins.spark.sd_transformers.ParquetToSparkDecodingHandler object>}, 's3': {'parquet': <flytekitplugins.spark.sd_transformers.ParquetToSparkDecodingHandler object>}}}#
- DEFAULT_FORMATS: Dict[Type, str] = {<class 'pandas.core.frame.DataFrame'>: 'parquet', <class 'pyarrow.lib.Table'>: 'parquet', <class 'pyspark.sql.dataframe.DataFrame'>: 'parquet'}#
- DEFAULT_PROTOCOLS: Dict[Type, str] = {<class 'pandas.core.frame.DataFrame'>: 's3', <class 'pyarrow.lib.Table'>: 's3', <class 'pyspark.sql.dataframe.DataFrame'>: 's3'}#
- ENCODERS: Dict[Type, Dict[str, Dict[str, flytekit.types.structured.structured_dataset.StructuredDatasetEncoder]]] = {<class 'pandas.core.frame.DataFrame'>: {'/': {'parquet': <flytekit.types.structured.basic_dfs.PandasToParquetEncodingHandler object>}, 's3': {'parquet': <flytekit.types.structured.basic_dfs.PandasToParquetEncodingHandler object>}, 'gs': {'parquet': <flytekit.types.structured.basic_dfs.PandasToParquetEncodingHandler object>}, 'bq': {'': <flytekit.types.structured.bigquery.PandasToBQEncodingHandlers object>}}, <class 'pyarrow.lib.Table'>: {'/': {'parquet': <flytekit.types.structured.basic_dfs.ArrowToParquetEncodingHandler object>}, 's3': {'parquet': <flytekit.types.structured.basic_dfs.ArrowToParquetEncodingHandler object>}, 'gs': {'parquet': <flytekit.types.structured.basic_dfs.ArrowToParquetEncodingHandler object>}, 'bq': {'': <flytekit.types.structured.bigquery.ArrowToBQEncodingHandlers object>}}, <class 'pyspark.sql.dataframe.DataFrame'>: {'/': {'parquet': <flytekitplugins.spark.sd_transformers.SparkToParquetEncodingHandler object>}, 's3': {'parquet': <flytekitplugins.spark.sd_transformers.SparkToParquetEncodingHandler object>}}}#
- Handlers#
alias of
Union
[flytekit.types.structured.structured_dataset.StructuredDatasetEncoder
,flytekit.types.structured.structured_dataset.StructuredDatasetDecoder
]
alias of
Union
[flytekit.types.structured.structured_dataset.StructuredDatasetEncoder
,flytekit.types.structured.structured_dataset.StructuredDatasetDecoder
] .. autoattribute:: Handlers .. autoattribute:: hash_overridable .. autoattribute:: name .. autoattribute:: python_type .. autoattribute:: type_assertions_enabled