Converting a Spark DataFrame to a Pandas DataFrame#

You will understand how a Spark Dataset can be returned from a task and consumed as a pandas DataFrame. If the dataframe does not fit in memory, it will result in a runtime failure.

import flytekit
import pandas
from flytekit import kwtypes, task, workflow
from flytekit.types.schema import FlyteSchema
from flytekitplugins.spark import Spark

Define my_schema#

This section defines a simple schema type with 2 columns, name: str and age: int.

my_schema = FlyteSchema[kwtypes(name=str, age=int)]

create_spark_df is a Spark task that runs within a Spark context (and relies on a Spark cluster that is up and running). This task generates a Spark DataFrame whose schema matches the predefined Define my_schema.

Notice that the task simply returns a pyspark.DataFrame object, even though the return type specifies Define my_schema. The flytekit type-system will automatically convert the pyspark.DataFrame to Flyte Schema object. FlyteSchema object is an abstract representation of a DataFrame, that can conform to multiple different dataframe formats.

@task(
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    cache_version="1",
)
def create_spark_df() -> my_schema:
    """
    This task returns a Spark dataset that conforms to the defined schema. Failure to do so should result
    in a runtime error. TODO: runtime error enforcement
    """
    sess = flytekit.current_context().spark_session
    return sess.createDataFrame(
        [("Alice", 5), ("Bob", 10), ("Charlie", 15),], my_schema.column_names(),
    )

The task sum_of_all_ages receives a parameter of type Define my_schema. Note that there is no expectation that the schema is a pandas DataFrame or a Spark DataFrame, but a generic schema object. The Flytekit schema object can be read into multiple formats using the open() method. Default conversion is to pandas.DataFrame. Refer to flytekit.types.schema.FlyteSchema for more details.

@task(cache_version="1")
def sum_of_all_ages(s: my_schema) -> int:
    """
    The schema is passed to this task. Schema is a reference to the object and has little to no overhead.
    Only performing an ``open`` on the schema will load the data into memory (or download if it is
    run in remote).
    """
    # This, by default, returns a pandas DataFrame object. ``open`` can be parameterized to return other DataFrame types.
    reader = s.open()
    # supported DataFrames
    df: pandas.DataFrame = reader.all()
    return int(df["age"].sum())

The schema workflow allows connecting create_spark_df with sum_of_all_ages since the return type of the first task and the parameter type for the second task match.

@workflow
def my_smart_schema() -> int:
    """
    This workflow shows how a simple schema can be created in Spark and passed to a python function and accessed as a
    pandas DataFrame. Flyte Schemas are abstract DataFrames and not tied to a specific memory representation.
    """
    df = create_spark_df()
    return sum_of_all_ages(s=df)

This program can be executed locally. This greatly simplifies using disparate DataFrame technologies for the end-user.

if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(f"Running my_smart_schema()-> {my_smart_schema()}")

Note

New DataFrame technologies can be dynamically loaded in Flytekit’s TypeEngine. This requires defining a Flytekit plugin. Refer to Modin if you’d like to work on one!

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery