Using Secrets in a Task#

Flyte supports running a wide variety of tasks, from containers to SQL queries and service calls. In order for Flyte-run containers to request and access secrets, Flyte provides a native Secret construct.

For a simple task that launches a Pod, the flow will look something like this:


  1. Flyte invokes a plugin to create the K8s object. This can be a Pod or a more complex CRD (e.g. Spark, PyTorch, etc.)


    The plugin will ensure that labels and annotations are passed through to any Pod that will be spawned due to the creation of the CRD.

  2. Flyte will apply labels and annotations that are referenced to all secrets the task is requesting access to.

  3. Flyte will send a POST request to ApiServer to create the object.

  4. Before persisting the Pod, ApiServer will invoke all registered Pod Webhooks. Flyte’s Pod Webhook will be called.

  5. Flyte Pod Webhook will then, using the labels and annotiations attached in step 2, lookup globally mounted secrets for each of the requested secrets.

  6. If found, Pod Webhook will mount them directly in the Pod. If not found, it will inject the appropriate annotations to load the secrets for K8s (or Vault or Confidant or any other secret management system plugin configured) into the task pod.

Once the secret is injected into the task pod, Flytekit can read it using the secret manager (see examples below).

The webhook is included in all overlays in the Flytekit repo. The deployment file creates (mainly) two things; a Job and a Deployment.

  1. flyte-pod-webhook-secrets Job: This job runs flytepropeller webhook init-certs command that issues self-signed CA Certificate as well as a derived TLS certificate and its private key. It stores them into a new secret flyte-pod-webhook-secret.

  2. flyte-pod-webhook Deployment: This deployment creates the Webhook pod which creates a MutatingWebhookConfiguration on startup. This serves as the registration contract with the ApiServer to know about the Webhook before it starts serving traffic.

Secret Discovery#

Flyte identifies secrets using a secret group and a secret key. In a task decorator you request a secret like this: @task(secret_requests=[Secret(group=SECRET_GROUP, key=SECRET_NAME)]) Flytekit provides a shorthand for loading the requested secret inside a task: secret = flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_NAME) See the python examples further down for more details on how to request and use secrets in a task.

Flytekit relies on the following environment variables to load secrets (defined here). When running tasks and workflows locally you should make sure to store your secrets accordingly or to modify these: - FLYTE_SECRETS_DEFAULT_DIR - The directory Flytekit searches for secret files, default: “/etc/secrets” - FLYTE_SECRETS_FILE_PREFIX - a common file prefix for Flyte secrets, default: “” - FLYTE_SECRETS_ENV_PREFIX - a common env var prefix for Flyte secrets, default: “_FSEC_”

When running a workflow on a Flyte cluster, the configured secret manager will use the secret Group and Key to try and retrieve a secret. If successful, it will make the secret available as either file or environment variable and will if necessary modify the above variables automatically so that the task can load and use the secrets.

Configuring a secret management system plugin into use#

When a task requests a secret Flytepropeller will try to retrieve secrets in the following order: 1.) checking for global secrets (secrets mounted as files or environment variables on the flyte-pod-webhook pod) and 2.) checking with an additional configurable secret manager. Note that the global secrets take precedence over any secret discoverable by the secret manager plugins.

The following additional secret managers are available at the time of writing: - K8s secrets (default) - flyte-pod-webhook will try to look for a K8s secret named after the secret Group and retrieve the value for the secret Key. - AWS Secret Manager - flyte-pod-webhook will add the AWS Secret Manager sidecar container to a task Pod which will mount the secret. - Vault Agent Injector - flyte-pod-webhook will annotate the task Pod with the respective Vault annotations that trigger an existing Vault Agent Injector to retrieve the specified secret Key from a vault path defined as secret Group.

You can configure the additional secret manager by defining secretManagerType to be either ‘K8s’, ‘AWS’ or ‘Vault’ in the core config <> of the Flytepropeller.

When using the K8s secret manager plugin (enabled by default), the secrets need to be available in the same namespace as the task execution (for example flytesnacks-development). K8s secrets can be mounted as either files or injected as environment variables into the task pod, so if you need to make larger files available to the task, then this might be the better option. Furthermore, this method also allows you to have separate credentials for different domains but still using the same name for the secret. The group of the secret request corresponds to the K8s secret name, while the name of the request corresponds to the key of the specific entry in the secret.

When using the Vault secret manager, make sure you have Vault Agent deployed on your cluster (step-by-step tutorial). Vault secrets can only be mounted as files and will become available under “/etc/flyte/secrets/SECRET_GROUP/SECRET_NAME”. Vault comes with two versions of the key-value secret store. By default the Vault secret manager will try to retrieve Version 2 secrets. You can specify the KV version by setting webhook.vaultSecretManager.kvVersion in the configmap. Note that the version number needs to be an explicit string (e.g. “1”). You can also configure the Vault role under which Flyte will try to read the secret by setting webhook.vaultSecretManager.role (default: “flyte”).

How to Use Secrets Injection in a Task#

This feature is available in Flytekit v0.17.0+. This example explains how a secret can be accessed in a Flyte Task. Flyte provides different types of Secrets, as part of SecurityContext. But, for users writing python tasks, you can only access secure secrets either as environment variable or injected into a file.

import os
import flytekit
from typing import Tuple

Flytekit exposes a type/class called Secrets. It can be imported as follows.

from flytekit import Secret, task, workflow

Secrets consists of a name and an enum that indicates how the secrets will be accessed. If the mounting_requirement is not specified then the secret will be injected as an environment variable is possible. Ideally, you need not worry about the mounting requirement, just specify the that matches the declared secret in Flyte backend

Let us declare a secret named user_secret in a secret group user-info. A secret group can have multiple secret associated with the group. Optionally it may also have a group_version. The version helps in rotating secrets. If not specified the task will always retrieve the latest version. Though not recommended some users may want the task version to be bound to a secret version.

SECRET_NAME = "user_secret"
SECRET_GROUP = "user-info"

Now declare the secret in the requests. The request tells Flyte to make the secret available to the task. The secret can then be accessed inside the task using the flytekit.ExecutionParameters, through the global flytekit context as shown below. At runtime, flytekit looks inside the task pod for an environment variable or a mounted file with a predefined name/path and loads the value.

@task(secret_requests=[Secret(group=SECRET_GROUP, key=SECRET_NAME)])
def secret_task() -> str:
    secret_val = flytekit.current_context().secrets.get(SECRET_GROUP, SECRET_NAME)
    # Please do not print the secret value, we are doing so just as a demonstration
    return secret_val


  • In case of failure to access the secret (it is not found at execution time) an error is raised.

  • Secrets group and key are required parameters during declaration and usage. Failure to specify will cause a ValueError

In some cases you may have multiple secrets and sometimes, they maybe grouped as one secret in the SecretStore. For example, In Kubernetes secrets, it is possible to nest multiple keys under the same secret. In this case, the name would be the actual name of the nested secret, and the group would be the identifier for the kubernetes secret.

As an example, define 2 secrets username and password, defined in the group user_info

USERNAME_SECRET = "username"
PASSWORD_SECRET = "password"

The Secret structure allows passing two fields, matching the key and the group, as previously described:

    secret_requests=[Secret(key=USERNAME_SECRET, group=SECRET_GROUP), Secret(key=PASSWORD_SECRET, group=SECRET_GROUP)])
def user_info_task() -> Tuple[str, str]:
    secret_username = flytekit.current_context().secrets.get(SECRET_GROUP, USERNAME_SECRET)
    secret_pwd = flytekit.current_context().secrets.get(SECRET_GROUP, PASSWORD_SECRET)
    # Please do not print the secret value, this is just a demonstration.
    return secret_username, secret_pwd

It is also possible to enforce Flyte to mount the secret as a file or an environment variable. The File type is useful for large secrets that do not fit in environment variables - typically asymmetric keys (certs etc). Another reason may be that a dependent library necessitates that the secret be available as a file. In these scenarios you can specify the mount_requirement. In the following example we force the mounting to be an Env variable

@task(secret_requests=[Secret(group=SECRET_GROUP, key=SECRET_NAME, mount_requirement=Secret.MountType.ENV_VAR)])
def secret_file_task() -> Tuple[str, str]:
    # SM here is a handle to the secrets manager
    sm = flytekit.current_context().secrets
    f = sm.get_secrets_file(SECRET_GROUP, SECRET_NAME)
    secret_val = sm.get(SECRET_GROUP, SECRET_NAME)
    # returning the filename and the secret_val
    return f, secret_val

These tasks can be used in your workflow as usual

def my_secret_workflow() -> Tuple[str, str, str, str, str]:
    x = secret_task()
    y, z = user_info_task()
    f, s = secret_file_task()
    return x, y, z, f, s

The simplest way to test Secret accessibility is to export the secret as an environment variable. There are some helper methods available to do so

from flytekit.testing import SecretsManager

if __name__ == "__main__":
    sec = SecretsManager()
    os.environ[sec.get_secrets_env_var(SECRET_GROUP, SECRET_NAME)] = "value"
    os.environ[sec.get_secrets_env_var(SECRET_GROUP, USERNAME_SECRET)] = "username_value"
    os.environ[sec.get_secrets_env_var(SECRET_GROUP, PASSWORD_SECRET)] = "password_value"
    x, y, z, f, s = my_secret_workflow()
    assert x == "value"
    assert y == "username_value"
    assert z == "password_value"
    assert f == sec.get_secrets_file(SECRET_GROUP, SECRET_NAME)
    assert s == "value"

Scaling the Webhook#

Vertical Scaling#

To scale the Webhook to be able to process the number/rate of pods you need, you may need to configure a vertical pod autoscaler.

Horizontal Scaling#

The Webhook does not make any external API Requests in response to Pod mutation requests. It should be able to handle traffic quickly. For horizontal scaling, adding additional replicas for the Pod in the deployment should be sufficient. A single MutatingWebhookConfiguration object will be used, the same TLS certificate will be shared across the pods and the Service created will automatically load balance traffic across the available pods.

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery