Kubernetes Spark Jobs

Flyte can execute Spark jobs natively on a Kubernetes Cluster, which manages a virtual cluster’s lifecycle, spin-up, and tear down. It leverages the open-sourced Spark On K8s Operator and can be enabled without signing up for any service. This is like running a transient spark cluster—a type of cluster spun up for a specific Spark job and torn down after completion. These clusters are better for production workloads but have an extra cost of setup and teardown.

In Flyte/K8s, the cost is amortized because pods are faster to create than a machine, but the penalty of downloading Docker images may affect the performance. Also, remember that starting a pod is not as fast as running a process.

Flytekit makes it possible to write PySpark code natively as a task and the Spark cluster will be automatically configured using the decorated SparkConf. The examples in this section provide a hands-on tutorial for writing PySpark tasks.

Note

This plugin has been tested at scale, and more than 100k Spark Jobs run through Flyte at Lyft. This still needs a large capacity on Kubernetes and careful configuration. We recommend using Multi-cluster mode - Multi-Cluster mode , and enabling Resource Quotas for large and extremely frequent Spark Jobs. This is not recommended for extremely short-running jobs, and it might be better to use a pre-spawned cluster. A job can be considered short if the runtime is less than 2-3 minutes. In this scenario, the cost of pod bring-up outweighs the cost of execution.

Why Use K8s Spark?

Managing Python dependencies is hard. Flyte makes it easy to version and manage dependencies using containers. The K8s Spark plugin brings all the benefits of containerization to Spark without needing to manage special Spark clusters.

Pros:

  1. Extremely easy to get started; get complete isolation between workloads

  2. Every job runs in isolation and has its own virtual cluster — no more nightmarish dependency management!

  3. Flyte manages everything for you!

Cons:

  1. Short running, bursty jobs are not a great fit because of the container overhead

  2. No interactive Spark capabilities are available with Flyte K8s Spark, which is more suited for running adhoc and scheduled jobs

Step 1: Deploy Spark Plugin in the Flyte Backend

Flyte Spark uses the Spark On K8s Operator and a custom built Flyte Spark Plugin. This is a backend plugin which has to be enabled in your deployment; you can follow the steps mentioned in the enable-backend-plugins section.

You can optionally configure the plugin as per the backend config structure and an example config is defined here. This is how it looks:

plugins:
  spark:
    spark-config-default:
      # We override the default credentials chain provider for Hadoop so that
      # it can use the serviceAccount based IAM role or ec2 metadata based.
      # This is more in line with how AWS works
      - spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
      - spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
      - spark.kubernetes.allocation.batch.size: "50"
      - spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
      - spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
      - spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
      - spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
      - spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
      - spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
      - spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
      - spark.hadoop.fs.s3a.multipart.threshold: "536870912"
      - spark.blacklist.enabled: "true"
      - spark.blacklist.timeout: "5m"
      - spark.task.maxfailures: "8"

Spark Service Accounts

Spark needs a special service account (with associated role and role bindings) to create executor pods. If you use IAM for Service accounts or GCP Workload identity, you need to update the service account to include this.

You can use Flyte cluster resource manager to manage creating the spark service account per namespace. For this, you need to add the cluster resource templates as shown here: refer to the *spark*.yaml files.

Step 2: Environment Setup

  1. Install flytekitplugins-spark using pip in your environment that contains flytekit >= 0.16.0.

    pip install flytekitplugins-spark
    
  2. Build Spark image correctly as explained in How to Build Your Dockerfile for Spark on K8s.

  3. Enable Spark plugin for Flyte refering to the Code Examples section. Additionally, Flyte uses the SparkOperator to run Spark Jobs and separate K8s Service Account/Role per namespace, which are created as part of the standard Flyte deployment.

  4. Ensure you have enough resources on your K8s cluster. Based on the resources required for your Spark job (across drivers/executors), you may have to tweak resource quotas for the namespace.

How to Build Your Dockerfile for Spark on K8s

Using Spark on K8s is extremely easy and provides full versioning using the custom-built Spark container. The built container can also execute regular Spark tasks. For Spark, the image must contain Spark dependencies and the correct entry point for the Spark driver/executors. This can be achieved using the flytekit_install_spark.sh script provided as part of the Dockerfile included here.

 1FROM ubuntu:focal
 2LABEL org.opencontainers.image.source https://github.com/flyteorg/flytesnacks
 3
 4WORKDIR /root
 5ENV VENV /opt/venv
 6ENV LANG C.UTF-8
 7ENV LC_ALL C.UTF-8
 8ENV PYTHONPATH /root
 9ENV DEBIAN_FRONTEND=noninteractive
10
11# Install Python3 and other basics
12RUN apt-get update && apt-get install -y python3.8 python3.8-venv make build-essential libssl-dev python3-pip curl
13
14# Install AWS CLI to run on AWS (for GCS install GSutil). This will be removed
15# in future versions to make it completely portable
16RUN pip3 install awscli
17
18ENV VENV /opt/venv
19# Virtual environment
20RUN python3 -m venv ${VENV}
21ENV PATH="${VENV}/bin:$PATH"
22
23# Install Python dependencies
24COPY kubernetes/k8s_spark/requirements.txt /root
25RUN pip install -r /root/requirements.txt
26
27RUN flytekit_install_spark3.sh
28# Adding Tini support for the spark pods
29RUN wget  https://github.com/krallin/tini/releases/download/v0.18.0/tini && \
30    cp tini /sbin/tini && cp tini /usr/bin/tini && \
31    chmod a+x /sbin/tini && chmod a+x /usr/bin/tini
32
33# Setup Spark environment
34ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64
35ENV SPARK_HOME /opt/spark
36ENV SPARK_VERSION 3.0.1
37ENV PYSPARK_PYTHON ${VENV}/bin/python3
38ENV PYSPARK_DRIVER_PYTHON ${VENV}/bin/python3
39
40# Copy the makefile targets to expose on the container. This makes it easier to register.
41COPY in_container.mk /root/Makefile
42COPY kubernetes/k8s_spark/sandbox.config /root
43
44# Copy the actual code
45COPY kubernetes/k8s_spark/ /root/k8s_spark
46
47# This tag is supplied by the build script and will be used to determine the version
48# when registering tasks, workflows, and launch plans
49ARG tag
50ENV FLYTE_INTERNAL_IMAGE $tag
51
52# Copy over the helper script that the SDK relies on
53RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/
54RUN chmod a+x /usr/local/bin/flytekit_venv
55
56# For spark we want to use the default entrypoint which is part of the
57# distribution, also enable the virtualenv for this image.
58# Note this relies on the VENV variable we've set in this image.
59ENTRYPOINT ["/usr/local/bin/flytekit_venv", "/opt/entrypoint.sh"]

Step 3: Optionally, Setup Visibility

Every time a Spark job is run, you can get a Spark application UI link to monitor the Job. And for historical executions, you can use the SparkHistory Server to retrieve the archived Spark execution history. Also, Flyte can create explicit links to the Spark driver logs and the individual Spark executor logs.

Spark history server and Spark UI links are directly shown in the Flyte Console and simply depend on the configuration.

Setup Spark Application UI (more involved)

To get a link for the in-progress Spark drivers, Spark application UI, you need to configure your Kubernetes to have wildcard ingress access -*.my-domain.net and configure the Spark On K8s Operator to create a new ingress route for every application. This can be done as a command-line option to Spark-operator called ingress-url-format.

Setup Spark Driver and Executor Logs

This can be configured by configuring the logs configuration for the Spark plugin. Spark Plugin uses the same default log configuration as explained in Configuring Logging Links in UI.

SparkPlugin supports separating User (spark user code) vs. System (spark core logs) to enhance visibility into Spark, which is only available if you can route the spark user logs separately from the core logs. Flyte does not automatically separate the logs. Checkout the configuration structure here.

  • Mixed: Get unseparated logs from Spark Driver (both user and system), which follow the same structure as all log plugins. You can get links to the K8s dashboard, or a log aggregator of your choice, as long as it can generate standardized links.

  • User: Logs from the driver which are separated (if log separation is available)

  • System: Logs from executors—usually will not return unique links per executors; more like a prefix where all executors logs can be found

  • AllUser: Logs all user logs across spark-submit, driver, and executor

Log config example

plugins:
    spark:
      logs:
        user:
          kubernetes-enabled: true
          kubernetes-url: <the existing k8s url you have in the main logs section>
        mixed:
          cloudwatch-enabled: true
          cloudwatch-template-uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=<LogGroupName>;prefix=var.log.containers.{{.podName}};streamFilter=typeLogStreamPrefix"
        system:
          cloudwatch-enabled: true
          cloudwatch-template-uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=<LogGroupName>;prefix=system_log.var.log.containers.{{.podName}};streamFilter=typeLogStreamPrefix"
        all-user:
          cloudwatch-enabled: true
          cloudwatch-template-uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=<LogGroupName>;prefix=var.log.containers.{{.podName}};streamFilter=typeLogStreamPrefix"

More configuration

Spark plugin supports further enhanced configuration options; for example, if you want some Spark features to be enabled by default for every Spark application, default Spark configurations are to be applied. Refer to the configuration structure for more details.

Code Examples

Gallery generated by Sphinx-Gallery