Note
Go to the end to download the full example code
Converting a Spark DataFrame to a Pandas DataFrame#
This example shows how a Spark dataset can be returned from a Flyte task and consumed as a pandas DataFrame.
First, we import the libraries.
We define two column types: name: str and age: int.
columns = kwtypes(name=str, age=int)
Next, we define a task that returns a Spark DataFrame.
@task(
task_config=Spark(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
limits=Resources(mem="2000M"),
cache_version="1",
)
def create_spark_df() -> Annotated[StructuredDataset, columns]:
"""
This task returns a Spark dataset that conforms to the defined schema. Failure to do so should result
in a runtime error. TODO: runtime error enforcement
"""
sess = flytekit.current_context().spark_session
return StructuredDataset(
dataframe=sess.createDataFrame(
[
("Alice", 5),
("Bob", 10),
("Charlie", 15),
],
["name", "age"],
)
)
create_spark_df
is a Spark task that runs within a Spark context (and relies on a Spark cluster that is up and running).
The task returns a pyspark.DataFrame
object, even though the return type specifies StructuredDataset
.
The flytekit type-system will automatically convert the pyspark.DataFrame
to a StructuredDataset
object.
StructuredDataset
object is an abstract representation of a DataFrame, that can conform to different DataFrame formats.
We define a task to consume the Spark DataFrame.
@task(cache_version="1")
def sum_of_all_ages(s: Annotated[StructuredDataset, columns]) -> int:
df: pandas.DataFrame = s.open(pandas.DataFrame).all()
return int(df["age"].sum())
The task sum_of_all_ages
receives a parameter of type StructuredDataset
.
We can use the open
method to specify the DataFrame format, which is pandas.DataFrame
in our case.
On calling all
on the structured dataset, the executor will load the data into memory (or download if it is run in remote).
Finally, we define a workflow.
@workflow
def my_smart_structured_dataset() -> int:
"""
This workflow shows how a simple schema can be created in Spark and passed to a python function and accessed as a
pandas DataFrame. Flyte Schemas are abstract DataFrames and not tied to a specific memory representation.
"""
df = create_spark_df()
return sum_of_all_ages(s=df)
You can execute the code locally!
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running my_smart_schema()-> {my_smart_structured_dataset()}")
New DataFrames can be dynamically loaded in Flytekit’s TypeEngine.
To register a custom DataFrame type, you can define an encoder and decoder for StructuredDataset
as outlined in the Structured Dataset example.
Existing DataFrame plugins include:
Total running time of the script: ( 0 minutes 0.000 seconds)