K8s OperatorΒΆ

This guide gives an overview of setting up the K8s Operator backend plugin in your Flyte deployment.

  1. Add Flyte chart repo to Helm

helm repo add flyteorg https://flyteorg.github.io/flyte
  1. Setup the cluster

  • Start the sandbox cluster

    flytectl sandbox start
    
  • Generate Flytectl sandbox config

    flytectl config init
    
  • Make sure you have up and running flyte cluster in AWS / GCP

  • Make sure you have correct kubeconfig and selected the correct kubernetes context

  • make sure you have the correct flytectl config at ~/.flyte/config.yaml

  1. Install the K8S Operator.

  • Add PyTorch repository

    git clone https://github.com/kubeflow/pytorch-operator.git
    
  • Build & apply PyTorch operator

    export KUBECONFIG=$KUBECONFIG:~/.kube/config:~/.flyte/k3s/k3s.yaml
    kustomize build pytorch-operator/manifests/overlays/kubeflow | kubectl apply -f -
    
  • Add training-operator repository

    git clone https://github.com/kubeflow/training-operator.git
    
  • Build & apply TensorFlow operator

    export KUBECONFIG=$KUBECONFIG:~/.kube/config:~/.flyte/k3s/k3s.yaml
    kustomize build training-operator/manifests/overlays/kubeflow | kubectl apply -f -
    
  • Add MPI repository

    git clone https://github.com/kubeflow/mpi-operator.git
    
  • Build & apply MPI operator

    export KUBECONFIG=$KUBECONFIG:~/.kube/config:~/.flyte/k3s/k3s.yaml
    kustomize build mpi-operator/manifests/overlays/kubeflow | kubectl apply -f -
    
  • Add Spark repository

    helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
    
  • Install Spark Operator

    helm install spark-operator spark-operator/spark-operator --namespace spark-operator --create-namespace
    
  1. Create a file named values-override.yaml and add the following config to it:

  • Enable PyTorch backend plugin

    configmap:
      enabled_plugins:
        # -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig)
        tasks:
          # -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig)
          task-plugins:
            # -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backend
            # plugins
            enabled-plugins:
              - container
              - sidecar
              - k8s-array
              - pytorch
            default-for-task-types:
              container: container
              sidecar: sidecar
              container_array: k8s-array
              pytorch: pytorch
    
  • Enable TensorFlow backend plugin

    configmap:
      enabled_plugins:
        # -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig)
        tasks:
          # -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig)
          task-plugins:
            # -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backend
            # plugins
            enabled-plugins:
              - container
              - sidecar
              - k8s-array
              - Tensorflow
            default-for-task-types:
              container: container
              sidecar: sidecar
              container_array: k8s-array
              Tensorflow: Tensorflow
    
  • Enable MPI backend plugin

    configmap:
      enabled_plugins:
        # -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig)
        tasks:
          # -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig)
          task-plugins:
            # -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backend
            # plugins
            enabled-plugins:
              - container
              - sidecar
              - k8s-array
              - mpi
            default-for-task-types:
              container: container
              sidecar: sidecar
              container_array: k8s-array
              mpi: mpi
    

Since sandbox uses minio, it needs additional configuration.

cluster_resource_manager:
  # -- Enables the Cluster resource manager component
  enabled: true
  # -- Configmap for ClusterResource parameters
  config:
    # -- ClusterResource parameters
    # Refer to the [structure](https://pkg.go.dev/github.com/lyft/flyteadmin@v0.3.37/pkg/runtime/interfaces#ClusterResourceConfig) to customize.
    cluster_resources:
      refreshInterval: 5m
      templatePath: "/etc/flyte/clusterresource/templates"
      customData:
        - production:
            - projectQuotaCpu:
                value: "5"
            - projectQuotaMemory:
                value: "4000Mi"
        - staging:
            - projectQuotaCpu:
                value: "2"
            - projectQuotaMemory:
                value: "3000Mi"
        - development:
            - projectQuotaCpu:
                value: "4"
            - projectQuotaMemory:
                value: "5000Mi"
      refresh: 5m

  # -- Resource templates that should be applied
  templates:
    # -- Template for namespaces resources
    - key: aa_namespace
      value: |
        apiVersion: v1
        kind: Namespace
        metadata:
          name: {{ namespace }}
        spec:
          finalizers:
          - kubernetes

    - key: ab_project_resource_quota
      value: |
        apiVersion: v1
        kind: ResourceQuota
        metadata:
          name: project-quota
          namespace: {{ namespace }}
        spec:
          hard:
            limits.cpu: {{ projectQuotaCpu }}
            limits.memory: {{ projectQuotaMemory }}

    - key: ac_spark_role
      value: |
        apiVersion: rbac.authorization.k8s.io/v1beta1
        kind: Role
        metadata:
          name: spark-role
          namespace: {{ namespace }}
        rules:
        - apiGroups: ["*"]
          resources: ["pods"]
          verbs: ["*"]
        - apiGroups: ["*"]
          resources: ["services"]
          verbs: ["*"]
        - apiGroups: ["*"]
          resources: ["configmaps", "persistentvolumeclaims"]
          verbs: ["*"]

    - key: ad_spark_service_account
      value: |
        apiVersion: v1
        kind: ServiceAccount
        metadata:
          name: spark
          namespace: {{ namespace }}

    - key: ae_spark_role_binding
      value: |
        apiVersion: rbac.authorization.k8s.io/v1beta1
        kind: RoleBinding
        metadata:
          name: spark-role-binding
          namespace: {{ namespace }}
        roleRef:
          apiGroup: rbac.authorization.k8s.io
          kind: Role
          name: spark-role
        subjects:
        - kind: ServiceAccount
          name: spark
          namespace: {{ namespace }}

sparkoperator:
  enabled: true
  plugin_config:
    plugins:
      spark:
        # -- Spark default configuration
        spark-config-default:
          # We override the default credentials chain provider for Hadoop so that
          # it can use the serviceAccount based IAM role or ec2 metadata based.
          # This is more in line with how AWS works
          - spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
          - spark.hadoop.fs.s3a.endpoint: "http://minio.flyte.svc.cluster.local:9000"
          - spark.hadoop.fs.s3a.access.key: "minio"
          - spark.hadoop.fs.s3a.secret.key: "miniostorage"
          - spark.hadoop.fs.s3a.path.style.access: "true"
          - spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
          - spark.kubernetes.allocation.batch.size: "50"
          - spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
          - spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3a.multipart.threshold: "536870912"
          - spark.excludeOnFailure.enabled: "true"
          - spark.excludeOnFailure.timeout: "5m"
          - spark.task.maxfailures: "8"
configmap:
  enabled_plugins:
    # -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig)
    tasks:
      # -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig)
      task-plugins:
        # -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backend
        # plugins
        enabled-plugins:
          - container
          - sidecar
          - k8s-array
          - spark
        default-for-task-types:
          container: container
          sidecar: sidecar
          container_array: k8s-array
          spark: spark
cluster_resource_manager:
  # -- Enables the Cluster resource manager component
  enabled: true
  # -- Configmap for ClusterResource parameters
  config:
    # -- ClusterResource parameters
    # Refer to the [structure](https://pkg.go.dev/github.com/lyft/flyteadmin@v0.3.37/pkg/runtime/interfaces#ClusterResourceConfig) to customize.
    cluster_resources:
      refreshInterval: 5m
      templatePath: "/etc/flyte/clusterresource/templates"
      customData:
        - production:
            - projectQuotaCpu:
                value: "5"
            - projectQuotaMemory:
                value: "4000Mi"
        - staging:
            - projectQuotaCpu:
                value: "2"
            - projectQuotaMemory:
                value: "3000Mi"
        - development:
            - projectQuotaCpu:
                value: "4"
            - projectQuotaMemory:
                value: "3000Mi"
      refresh: 5m

  # -- Resource templates that should be applied
  templates:
    # -- Template for namespaces resources
    - key: aa_namespace
      value: |
        apiVersion: v1
        kind: Namespace
        metadata:
          name: {{ namespace }}
        spec:
          finalizers:
          - kubernetes

    - key: ab_project_resource_quota
      value: |
        apiVersion: v1
        kind: ResourceQuota
        metadata:
          name: project-quota
          namespace: {{ namespace }}
        spec:
          hard:
            limits.cpu: {{ projectQuotaCpu }}
            limits.memory: {{ projectQuotaMemory }}

    - key: ac_spark_role
      value: |
        apiVersion: rbac.authorization.k8s.io/v1beta1
        kind: Role
        metadata:
          name: spark-role
          namespace: {{ namespace }}
        rules:
        - apiGroups: ["*"]
          resources:
          - pods
          verbs:
          - '*'
        - apiGroups: ["*"]
          resources:
          - services
          verbs:
          - '*'
        - apiGroups: ["*"]
          resources:
          - configmaps
          verbs:
          - '*'

    - key: ad_spark_service_account
      value: |
        apiVersion: v1
        kind: ServiceAccount
        metadata:
          name: spark
          namespace: {{ namespace }}

    - key: ae_spark_role_binding
      value: |
        apiVersion: rbac.authorization.k8s.io/v1beta1
        kind: RoleBinding
        metadata:
          name: spark-role-binding
          namespace: {{ namespace }}
        roleRef:
          apiGroup: rbac.authorization.k8s.io
          kind: Role
          name: spark-role
        subjects:
        - kind: ServiceAccount
          name: spark
          namespace: {{ namespace }}

sparkoperator:
  enabled: true
  plugin_config:
    plugins:
      spark:
        # -- Spark default configuration
        spark-config-default:
          # We override the default credentials chain provider for Hadoop so that
          # it can use the serviceAccount based IAM role or ec2 metadata based.
          # This is more in line with how AWS works
          - spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
          - spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
          - spark.kubernetes.allocation.batch.size: "50"
          - spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
          - spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3a.multipart.threshold: "536870912"
          - spark.excludeOnFailure.enabled: "true"
          - spark.excludeOnFailure.timeout: "5m"
          - spark.task.maxfailures: "8"
configmap:
  enabled_plugins:
    # -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig)
    tasks:
      # -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig)
      task-plugins:
        # -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backend
        # plugins
        enabled-plugins:
          - container
          - sidecar
          - k8s-array
          - spark
        default-for-task-types:
          container: container
          sidecar: sidecar
          container_array: k8s-array
          spark: spark
  1. Upgrade the Flyte Helm release.

helm upgrade flyte flyteorg/flyte-core -f https://raw.githubusercontent.com/flyteorg/flyte/master/charts/flyte-core/values-sandbox.yaml -f values-override.yaml -n flyte
  1. Register the plugin example.

flytectl register files --config ~/.flyte/config.yaml https://github.com/flyteorg/flytesnacks/releases/download/v0.3.75/snacks-cookbook-integrations-kubernetes-kfpytorch.tar.gz --archive -p flytesnacks -d development --version latest
# TODO: https://github.com/flyteorg/flyte/issues/1757
flytectl register files --config ~/.flyte/config.yaml https://github.com/flyteorg/flytesnacks/releases/download/v0.3.75/snacks-cookbook-integrations-kubernetes-kftensorflow.tar.gz --archive -p flytesnacks -d development --version latest
flytectl register files --config ~/.flyte/config.yaml https://github.com/flyteorg/flytesnacks/releases/download/v0.3.75/snacks-cookbook-integrations-kubernetes-kfmpi.tar.gz --archive -p flytesnacks -d development --version latest
flytectl register files --config ~/.flyte/config.yaml https://github.com/flyteorg/flytesnacks/releases/download/v0.3.75/snacks-cookbook-integrations-kubernetes-k8s_spark.tar.gz --archive -p flytesnacks -d development --version latest
  1. Launch an execution

  • Navigate to the Flyte Console’s UI (e.g. sandbox) and find the relevant workflow

  • Click on Launch to open up a launch form

  • Give spark as the service account if launching a spark example

  • Submit the form to launch an execution

  • Retrieve an execution in the form of a YAML file

    flytectl get launchplan --config ~/.flyte/config.yaml --project flytesnacks --domain development kfpytorch.pytorch_mnist.pytorch_training_wf  --latest --execFile exec_spec.yaml
    
  • Launch! πŸš€

    flytectl --config ~/.flyte/config.yaml create execution -p <project> -d <domain> --execFile ~/exec_spec.yaml
    
  • Retrieve an execution in the form of a YAML file

    flytectl get launchplan --config ~/.flyte/config.yaml --project flytesnacks --domain development <TODO: https://github.com/flyteorg/flyte/issues/1757>  --latest --execFile exec_spec.yaml
    
  • Launch! πŸš€

    flytectl --config ~/.flyte/config.yaml create execution -p <project> -d <domain> --execFile ~/exec_spec.yaml
    
  • Retrieve an execution in the form of a YAML file

    flytectl get launchplan --config ~/.flyte/config.yaml --project flytesnacks --domain development kfmpi.mpi_mnist.horovod_training_wf  --latest --execFile exec_spec.yaml
    
  • Launch! πŸš€

    flytectl --config ~/.flyte/config.yaml create execution -p <project> -d <domain> --execFile ~/exec_spec.yaml
    
  • Retrieve an execution in the form of a YAML file

    flytectl get launchplan --config ~/.flyte/config.yaml --project flytesnacks --domain development k8s_spark.pyspark_pi.my_spark  --latest --execFile exec_spec.yaml
    
  • Fill in the kubeServiceAcct as spark in the exec_spec.yaml file

  • Launch! πŸš€

    flytectl --config ~/.flyte/config.yaml create execution -p <project> -d <domain> --execFile ~/exec_spec.yaml