Data Engineering#

Flyte is well-suited for data engineering use cases, where you can interleave SQL queries with data processing logic implemented in Python with whichever data processing tools you prefer.

In this example, we create an ETL workflow that extracts data from a public RNA database, performs some simple transforms on the data, and loads it into a CSV file.


First, we define an extract_task task using the flytekitplugins-sqlalchemy plugin, which provides an interface to perform SQL queries via the SQLAlchemyTask and SQLAlchemyConfig classes.

import os

import flytekit
import pandas as pd
from flytekit import Resources, kwtypes, task, workflow
from flytekit.types.file import CSVFile
from flytekitplugins.sqlalchemy import SQLAlchemyConfig, SQLAlchemyTask


extract_task = SQLAlchemyTask(
        "select len as sequence_length, timestamp from rna "
        "where len >= {{ .inputs.min_length }} and len <= {{ .inputs.max_length }} "
        "limit {{ .inputs.limit }}"
    inputs=kwtypes(min_length=int, max_length=int, limit=int),
2023-03-23 20:37:12.986557: I tensorflow/core/platform/] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-23 20:37:13.174780: E tensorflow/stream_executor/cuda/] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

You can format the query_template with {{ .inputs.<input_name> }} to parameterize your query with the input keyword type specification, which maps task argument names to their expected types.


You can request for access to secrets via the secret_requests of the SQLAlchemyTask constructor, then pass in a secret_connect_args argument to the SQLAlchemyConfig constructor, assuming that you have connection credentials available in the configured Secrets Management System, which is K8s by default.


Next, we parse the raw timestamps and represent the time as separate date and time columns. Notice that we can encode the assumptions we have about this task’s resource requirements with the Resources object. If those assumptions ever change we can update the resource request here, or override it at the workflow-level with the with_overrides method.

def transform(df: pd.DataFrame) -> pd.DataFrame:
    """Add date and time columns; drop timestamp column."""
    timestamp = pd.to_datetime(df["timestamp"])
    df["date"] =
    df["time"] = timestamp.dt.time
    df.drop("timestamp", axis=1, inplace=True)
    return df


Finally, we load the transformed data into its final destination: a CSV file in blob storage. Flyte has a built-in CSVFile type that automatically handles serializing/deserializing and uploading/downloading the file as it’s passed from one task to the next. All you need to do is write the file to some local location and pass that location to the path argument of CSVFile.

def load(df: pd.DataFrame) -> CSVFile:
    """Load the dataframe to a csv file."""
    csv_file = os.path.join(flytekit.current_context().working_directory, "rna_df.csv")
    df.to_csv(csv_file, index=False)
    return CSVFile(path=csv_file)

ETL Workflow#

Putting all the pieces together, we create an etl_workflow that produces a dataset based on the parameters you give it.

def etl_workflow(
    min_length: int = 50, max_length: int = 200, limit: int = 10
) -> CSVFile:
    """Build an extract, transform and load pipeline."""
    return load(
            df=extract_task(min_length=min_length, max_length=max_length, limit=limit)

During local execution, this CSV file lives in a random local directory, but when the workflow is run on a Flyte cluster, this file lives in the configured blob store, like S3 or GCS.

Running this workflow locally, we can access the CSV file and read it into a pandas.DataFrame.

csv_file = etl_workflow(limit=5)
Connecting to db postgresql://
Interpolated query select len as sequence_length, timestamp from rna where len >= 50 and len <= 200 limit 5
sequence_length date time
0 62 2014-05-29 13:51:05
1 87 2014-05-29 13:51:05
2 141 2014-05-29 13:51:05
3 67 2014-05-29 13:51:05
4 142 2014-05-29 13:51:05

Workflows as Reusable Components#

Because Flyte tasks and workflows are simply functions, we can embed etl_workflow as part of a larger workflow, where it’s used to create a CSV file that’s then consumed by downstream tasks or subworkflows:

def aggregate(file: CSVFile) -> pd.DataFrame:
    data = pd.read_csv(file)
    ... # process the data further 

def plot(data: pd.DataFrame):
    ...  # create a plot

def downstream_workflow(
    min_length: int = 50, max_length: int = 200, limit: int = 10
    """A downstream workflow that visualizes an aggregation of the data."""
    csv_file = etl_workflow(
    return plot(data=aggregate(file=csv_file))


Prefer other data processing frameworks? Flyte ships with Polars, Dask, Modin, Spark, Vaex, and DBT integrations.

For database connectors, Flyte provides first-party support for AWS Athena, Google Bigquery, Snowflake, SQLAlchemy, and SQLite3.