Kubernetes Spark Jobs#

Tags: Spark, Integration, DistributedComputing, Data, Advanced

Flyte can execute Spark jobs natively on a Kubernetes Cluster, which manages a virtual cluster’s lifecycle, spin-up, and tear down. It leverages the open-sourced Spark On K8s Operator and can be enabled without signing up for any service. It is like running a transient spark cluster — a type of cluster spun up for a specific Spark job and torn down after completion. These clusters are better for production workloads but have an extra cost of setup and teardown.

In Flyte, the cost is amortized because pods are faster to create than a machine, but the penalty of downloading Docker images may affect the performance. Also, remember that starting a pod is not as fast as running a process.

Flytekit makes it possible to write PySpark code natively as a task and the Spark cluster will be automatically configured using the decorated SparkConf. The examples in this guide provide a hands-on tutorial for writing PySpark tasks.

Note

This plugin has been tested at scale, and more than 100k Spark Jobs run through Flyte at Lyft. This still needs a large capacity on Kubernetes and careful configuration. We recommend using multi-cluster mode: deployment/cluster_config/performance:multi-cluster mode , and enabling Resource Quotas for large and extremely frequent Spark Jobs. This is not recommended for extremely short-running jobs, and it might be better to use a pre-spawned cluster. A job can be considered short if the runtime is less than 2-3 minutes. In this scenario, the cost of pod bring-up outweighs the cost of execution.

Why Use Kubernetes Spark?#

Managing Python dependencies is hard. Flyte makes it easy to version and manage dependencies using containers. The K8s Spark plugin brings all the benefits of containerization to Spark without needing to manage special Spark clusters.

Pros

  1. Extremely easy to get started; get complete isolation between workloads.

  2. Every job runs in isolation and has its own virtual cluster — no more nightmarish dependency management!

  3. Flyte manages everything for you!

Cons

  1. Short running, bursty jobs are not a great fit because of the container overhead.

  2. No interactive Spark capabilities are available with Flyte K8s Spark, which is more suited for running adhoc and scheduled jobs.

Step 1: Deploy Spark Plugin in the Flyte Backend#

Flyte Spark uses the Spark On K8s Operator and a custom built Flyte Spark Plugin. This is a backend plugin which has to be enabled in your deployment. You can follow the steps mentioned in the K8s Plugins section.

You can optionally configure the plugin as per the backend config structure and an example config is defined here. This is how it looks:

plugins:
  spark:
    spark-config-default:
      # We override the default credentials chain provider for Hadoop so that
      # it can use the serviceAccount based IAM role or ec2 metadata based.
      # This is more in line with how AWS works
      - spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
      - spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
      - spark.kubernetes.allocation.batch.size: "50"
      - spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
      - spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
      - spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
      - spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
      - spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
      - spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
      - spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
      - spark.hadoop.fs.s3a.multipart.threshold: "536870912"
      - spark.blacklist.enabled: "true"
      - spark.blacklist.timeout: "5m"
      - spark.task.maxfailures: "8"

Spark Service Accounts#

Spark needs a special service account (with associated role and role bindings) to create executor pods. If you use IAM for Service accounts or GCP Workload identity, you need to update the service account to include this.

You can use Flyte cluster resource manager to manage creating the spark service account per namespace. For this, you need to add the cluster resource templates as shown here (refer to the spark.yaml files).

Note

Refer to this guide to use GCP instead of AWS.

Step 2: Environment Setup#

  1. Install flytekitplugins-spark using pip in your environment that contains flytekit.

    pip install flytekitplugins-spark
    
  2. Build Spark image correctly as explained in How to Build Your Dockerfile for Spark on Kubernetes.

  3. Enable Spark plugin for Flyte by referring to the Code Examples section. Flyte uses the SparkOperator to run Spark Jobs and separate K8s Service Account/Role per namespace, which are created as part of the standard Flyte deployment.

  4. Ensure you have enough resources on your K8s cluster. Based on the resources required for your Spark job (across drivers/executors), you may have to tweak resource quotas for the namespace.

How to Build Your Dockerfile for Spark on Kubernetes#

Using Spark on K8s is extremely easy and provides full versioning using the custom-built Spark container. The built container can also execute regular Spark tasks. For Spark, the image must contain Spark dependencies and the correct entry point for the Spark driver/executors.

 1FROM apache/spark-py:3.3.1
 2LABEL org.opencontainers.image.source https://github.com/flyteorg/flytesnacks
 3
 4WORKDIR /root
 5ENV VENV /opt/venv
 6ENV LANG C.UTF-8
 7ENV LC_ALL C.UTF-8
 8ENV PYTHONPATH /root
 9ENV DEBIAN_FRONTEND=noninteractive
10ARG spark_uid=1001
11
12## Install Python3 and other basics
13USER 0
14RUN apt-get update && apt-get install -y python3 python3-venv make build-essential libssl-dev python3-pip curl wget
15
16# Install AWS CLI to run on AWS (for GCS install GSutil). This will be removed
17# in future versions to make it completely portable
18RUN pip3 install awscli
19
20WORKDIR /opt
21RUN curl https://sdk.cloud.google.com > install.sh
22RUN bash /opt/install.sh --install-dir=/opt
23ENV PATH $PATH:/opt/google-cloud-sdk/bin
24WORKDIR /root
25
26# Virtual environment
27ENV VENV /opt/venv
28RUN python3 -m venv ${VENV}
29ENV PATH="${VENV}/bin:$PATH"
30RUN pip3 install wheel
31
32# Install Python dependencies
33COPY k8s_spark/requirements.txt /root
34RUN pip install -r /root/requirements.txt
35
36RUN wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.2/hadoop-aws-3.2.2.jar -P /opt/spark/jars && \
37    wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.563/aws-java-sdk-bundle-1.11.563.jar -P /opt/spark/jars
38
39# Copy the makefile targets to expose on the container. This makes it easier to register.
40# Delete this after we update CI
41COPY in_container.mk /root/Makefile
42
43# Delete this after we update CI to not serialize inside the container
44COPY k8s_spark/sandbox.config /root
45
46# Copy the actual code
47COPY k8s_spark/ /root/k8s_spark
48
49# This tag is supplied by the build script and will be used to determine the version
50# when registering tasks, workflows, and launch plans
51ARG tag
52ENV FLYTE_INTERNAL_IMAGE $tag
53
54# Copy over the helper script that the SDK relies on
55RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/
56RUN chmod a+x /usr/local/bin/flytekit_venv
57
58# Set /root user and group
59RUN chown -R ${spark_uid}:${spark_uid} /root
60
61# For spark we want to use the default entrypoint which is part of the
62# distribution, also enable the virtualenv for this image.
63ENTRYPOINT ["/opt/entrypoint.sh"]
64
65USER ${spark_uid}

Step 3: Optionally, Setup Visibility#

Every time a Spark job is run, you can get a Spark application UI link to monitor the job. And for historical executions, you can use the SparkHistory Server to retrieve the archived Spark execution history. Also, Flyte can create explicit links to the Spark driver logs and the individual Spark executor logs.

Spark history server and Spark UI links are shown on the Flyte Console and depend on the following configuration:

Setup Spark Application UI (more involved)#

To get a link for the in-progress Spark drivers, Spark application UI, you need to configure your Kubernetes to have wildcard ingress access -*.my-domain.net and configure the Spark On K8s Operator to create a new ingress route for every application. This can be done as a command-line option to Spark-operator which is called the ingress-url-format.

Setup Spark Driver and Executor Logs#

This can be configured by configuring the logs configuration of the Spark plugin. Spark Plugin uses the same default log configuration as explained in Configuring Logging Links in UI.

SparkPlugin supports separating User (spark user code) and System (spark core logs) to enhance visibility into Spark, which is only available if you can route the spark user logs separately from the core logs. Flyte does not automatically separate the logs. Checkout the configuration structure here.

  • Mixed: Get unseparated logs from Spark Driver (both user and system), which follow the same structure as all log plugins. You can get links to the K8s dashboard, or a log aggregator of your choice, as long as it can generate standardized links.

  • User: Logs from the driver which are separated (if log separation is available)

  • System: Logs from executors—usually will not return unique links per executors; more like a prefix where all executors logs can be found

  • AllUser: Logs all user logs across spark-submit, driver, and executor

Log config example

plugins:
    spark:
      logs:
        user:
          kubernetes-enabled: true
          kubernetes-url: <the existing k8s url you have in the main logs section>
        mixed:
          cloudwatch-enabled: true
          cloudwatch-template-uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=<LogGroupName>;prefix=var.log.containers.{{.podName}};streamFilter=typeLogStreamPrefix"
        system:
          cloudwatch-enabled: true
          cloudwatch-template-uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=<LogGroupName>;prefix=system_log.var.log.containers.{{.podName}};streamFilter=typeLogStreamPrefix"
        all-user:
          cloudwatch-enabled: true
          cloudwatch-template-uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=<LogGroupName>;prefix=var.log.containers.{{.podName}};streamFilter=typeLogStreamPrefix"

More configuration#

Spark plugin supports further enhanced configuration options; for example, if you want some Spark features to be enabled by default for every Spark application, default Spark configurations are to be applied. Refer to the configuration structure for more details.

Code Examples#

Writing a PySpark Task

Writing a PySpark Task

Writing a PySpark Task
Converting a Spark DataFrame to a Pandas DataFrame

Converting a Spark DataFrame to a Pandas DataFrame

Converting a Spark DataFrame to a Pandas DataFrame

Gallery generated by Sphinx-Gallery