FlyteRemote: A Programmatic Control Plane Interface#
For those who require programmatic access to the control plane, the remote
module enables you to perform
certain operations in a Python runtime environment.
Since this section naturally deals with the control plane, this discussion is only relevant for those who have a Flyte backend set up and have access to it (a local demo cluster will suffice as well).
Creating a FlyteRemote Object#
The FlyteRemote
class is the entrypoint for programmatically performing operations in a Python
runtime. It can be initialized by passing in the:
Config
object: the parent configuration object that holds all the configuration information to connect to the Flyte backend.default_project
: the default project to use when fetching or executing flyte entities.default_domain
: the default domain to use when fetching or executing flyte entities.file_access
: the file access provider to use for offloading non-literal inputs/outputs.kwargs
: additional arguments that need to be passed to createSynchronousFlyteClient
.
A FlyteRemote
object can be created in various ways:
Auto#
The Config
class’s auto()
method can be used to automatically
construct the Config
object.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
remote = FlyteRemote(config=Config.auto())
auto
also accepts a config_file
argument, which is the path to the configuration file to use.
The order of precedence that auto
follows is:
Finds all the environment variables that match the configuration variables.
If no environment variables are set, it looks for a configuration file at the path specified by the
config_file
argument.If no configuration file is found, it uses the default values.
Sandbox#
The Config
class’s for_sandbox()
method can be used to
construct the Config
object, specifically to connect to the Flyte cluster.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
remote = FlyteRemote(config=Config.for_sandbox())
The initialization is as simple as calling for_sandbox()
on the Config
class!
This, by default, uses localhost:30081
as the endpoint, and the default minio credentials.
If the sandbox is in a hosted-like environment, then port-forward or ingress URLs need to be taken care of.
Any Endpoint#
The Config
class’s for_endpoint()
method can be used to
construct the Config
object to connect to a specific endpoint.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="flyte.example.net"),
default_project="flytesnacks",
default_domain="development",
)
The for_endpoint
method also accepts:
insecure
: whether to use insecure connections. Defaults toFalse
.data_config
: can be used to configure how data is downloaded or uploaded to a specific blob storage like S3, GCS, etc.config_file
: the path to the configuration file to use.
Generalized Initialization#
The Config
class can be directly used to construct the Config
object if additional configuration is needed.
You can send PlatformConfig
, DataConfig
,
SecretsConfig
, and StatsConfig
objects to the Config
class.
|
Settings to talk to a Flyte backend. |
|
Any data storage specific configuration. |
|
Configuration for secrets. |
|
Configuration for sending statsd. |
For example:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, PlatformConfig
remote = FlyteRemote(
config=Config(
platform=PlatformConfig(
endpoint="flyte.example.net",
insecure=False,
client_id="my-client-id",
client_credentials_secret="my-client-secret",
auth_mode="client_credentials",
),
secrets=SecretsConfig(default_dir="/etc/secrets"),
)
)
Fetching Entities#
Tasks, workflows, launch plans, and executions can be fetched using FlyteRemote.
flyte_task = remote.fetch_task(name="my_task", version="v1")
flyte_workflow = remote.fetch_workflow(name="my_workflow", version="v1")
flyte_launch_plan = remote.fetch_launch_plan(name="my_launch_plan", version="v1")
flyte_execution = remote.fetch_execution(name="my_execution")
project
and domain
can also be specified in all the fetch_*
calls.
If not specified, the default values given during the creation of the FlyteRemote object will be used.
The following is an example that fetches workflow()
:
from flytekit import workflow
task_1 = remote.fetch_task(name="core.basic.hello_world.say_hello", version="v1")
task_2 = remote.fetch_task(
name="core.basic.lp.greet",
version="v13",
project="flytesnacks",
domain="development",
)
@workflow
def my_remote_wf(name: str) -> int:
return task_2(task_1(name=name))
Another example that dynamically creates a launch plan for the my_remote_wf
workflow:
from flytekit import LaunchPlan
flyte_workflow = remote.fetch_workflow(
name="my_workflow", version="v1", project="flytesnacks", domain="development"
)
launch_plan = LaunchPlan.get_or_create(name="my_launch_plan", workflow=flyte_workflow)
Registering Entities#
Tasks, workflows, and launch plans can be registered using FlyteRemote.
from flytekit.configuration import SerializationSettings
flyte_entity = ...
flyte_task = remote.register_task(
entity=flyte_entity,
serialization_settings=SerializationSettings(image_config=None),
version="v1",
)
flyte_workflow = remote.register_workflow(
entity=flyte_entity,
serialization_settings=SerializationSettings(image_config=None),
version="v1",
)
flyte_launch_plan = remote.register_launch_plan(entity=flyte_entity, version="v1")
entity
: the entity to register.version
: the version that will be used to register. If not specified, the version used in serialization settings will be used.serialization_settings
: the serialization settings to use. Refer toSerializationSettings
to know all the acceptable parameters.
All the additional parameters which can be sent to the register_*
methods can be found in the documentation for the corresponding method:
register_task()
, register_workflow()
,
and register_launch_plan()
.
The SerializationSettings
class accepts ImageConfig
which
holds the available images to use for the registration.
The following example showcases how to register a workflow using an existing image if the workflow is created locally:
from flytekit.configuration import ImageConfig
img = ImageConfig.from_images(
"docker.io/xyz:latest", {"spark": "docker.io/spark:latest"}
)
wf2 = remote.register_workflow(
my_remote_wf,
serialization_settings=SerializationSettings(image_config=img),
version="v1",
)
Executing Entities#
You can execute a task, workflow, or launch plan using execute()
method
which returns a FlyteWorkflowExecution
object.
For more information on Flyte entities, see the remote flyte entities reference.
flyte_entity = ... # one of FlyteTask, FlyteWorkflow, or FlyteLaunchPlan
execution = remote.execute(
flyte_entity, inputs={...}, execution_name="my_execution", wait=True
)
inputs
: the inputs to the entity.execution_name
: the name of the execution. This is useful to avoid de-duplication of executions.wait
: synchronously wait for the execution to complete.
Additional arguments include:
project
: the project on which to execute the entity.domain
: the domain on which to execute the entity.type_hints
: a dictionary mapping Python types to their corresponding Flyte types.options
: options can be configured for a launch plan during registration or overridden during execution. Refer toOptions
to know all the acceptable parameters.
The following is an example demonstrating how to use the Options
class to configure a Flyte entity:
from flytekit.models.common import AuthRole, Labels
from flytekit.tools.translator import Options
flyte_entity = ... # one of FlyteTask, FlyteWorkflow, or FlyteLaunchPlan
execution = remote.execute(
flyte_entity,
inputs={...},
execution_name="my_execution",
wait=True,
options=Options(
raw_data_prefix="s3://my-bucket/my-prefix",
auth_role=AuthRole(assumable_iam_role="my-role"),
labels=Labels({"my-label": "my-value"}),
),
)
Retrieving & Inspecting Executions#
After an execution is completed, you can retrieve the execution using the fetch_execution()
method.
The fetched execution can be used to retrieve the inputs and outputs of an execution.
execution = remote.fetch_execution(
name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development"
)
input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()
The inputs
and outputs
correspond to the top-level execution or the workflow itself.
To fetch a specific output, say, a model file:
model_file = execution.outputs["model_file"]
with open(model_file) as f:
# use mode
...
You can use sync()
to sync the entity object’s state with the remote state during the execution run:
synced_execution = remote.sync(execution, sync_nodes=True)
node_keys = synced_execution.node_executions.keys()
Note
During the sync, you may come across Received message larger than max (xxx vs. 4194304)
error if the message size is too large. In that case, edit the flyte-admin-base-config
config map using the command kubectl edit cm flyte-admin-base-config -n flyte
to increase the maxMessageSizeBytes
value. Refer to the troubleshooting guide in case you’ve queries about the command’s usage.
node_executions
will fetch all the underlying node executions recursively.
To fetch output of a specific node execution:
node_execution_output = synced_execution.node_executions["n1"].outputs["model_file"]
Node here, can correspond to a task, workflow, or branch node.
Listing Entities#
To list the recent executions, use the recent_executions()
method.
recent_executions = remote.recent_executions(project="flytesnacks", domain="development", limit=10)
The limit
parameter is optional and defaults to 100.
To list tasks by version, use the list_tasks_by_version()
method.
tasks = remote.list_tasks_by_version(project="flytesnacks", domain="development", version="v1")
Terminating an Execution#
To terminate an execution, use the terminate()
method.
execution = remote.fetch_execution(name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development")
remote.terminate(execution, cause="Code needs to be updated")