Note
Click here to download the full example code
Converting a Spark DataFrame to a Pandas DataFrame#
You will understand how a Spark Dataset can be returned from a task and consumed as a pandas DataFrame. If the dataframe does not fit in memory, it will result in a runtime failure.
Define my_schema#
This section defines a simple schema type with 2 columns, name: str and age: int.
my_schema = FlyteSchema[kwtypes(name=str, age=int)]
create_spark_df
is a Spark task that runs within a Spark context (and relies on a Spark cluster that is up and running). This task generates a Spark DataFrame whose schema matches the predefined Define my_schema.
Notice that the task simply returns a pyspark.DataFrame object, even though the return type specifies Define my_schema. The flytekit type-system will automatically convert the pyspark.DataFrame to Flyte Schema object. FlyteSchema object is an abstract representation of a DataFrame, that can conform to multiple different dataframe formats.
@task(
task_config=Spark(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
cache_version="1",
)
def create_spark_df() -> my_schema:
"""
This task returns a Spark dataset that conforms to the defined schema. Failure to do so should result
in a runtime error. TODO: runtime error enforcement
"""
sess = flytekit.current_context().spark_session
return sess.createDataFrame(
[("Alice", 5), ("Bob", 10), ("Charlie", 15),], my_schema.column_names(),
)
The task sum_of_all_ages
receives a parameter of type Define my_schema. Note that there is no
expectation that the schema is a pandas DataFrame or a Spark DataFrame, but a generic schema object. The Flytekit schema object
can be read into multiple formats using the open()
method. Default conversion is to pandas.DataFrame
.
Refer to flytekit.types.schema.FlyteSchema
for more details.
@task(cache_version="1")
def sum_of_all_ages(s: my_schema) -> int:
"""
The schema is passed to this task. Schema is a reference to the object and has little to no overhead.
Only performing an ``open`` on the schema will load the data into memory (or download if it is
run in remote).
"""
# This, by default, returns a pandas DataFrame object. ``open`` can be parameterized to return other DataFrame types.
reader = s.open()
# supported DataFrames
df: pandas.DataFrame = reader.all()
return int(df["age"].sum())
The schema workflow allows connecting create_spark_df
with sum_of_all_ages
since the return type of the first task and the parameter type for the second task match.
@workflow
def my_smart_schema() -> int:
"""
This workflow shows how a simple schema can be created in Spark and passed to a python function and accessed as a
pandas DataFrame. Flyte Schemas are abstract DataFrames and not tied to a specific memory representation.
"""
df = create_spark_df()
return sum_of_all_ages(s=df)
This program can be executed locally. This greatly simplifies using disparate DataFrame technologies for the end-user.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running my_smart_schema()-> {my_smart_schema()}")
Note
New DataFrame technologies can be dynamically loaded in Flytekit’s TypeEngine. This requires defining a Flytekit plugin. Refer to Modin if you’d like to work on one!
Total running time of the script: ( 0 minutes 0.000 seconds)