FlyteRemote: A Programmatic Control Plane Interface¶
For those who require programmatic access to the control plane, the remote
module enables you to perform
certain operations in a Python runtime environment.
Since this section naturally deals with the control plane, this discussion is only relevant for those who have a Flyte backend set up and have access to it (a local demo cluster will suffice as well).
Creating a FlyteRemote Object¶
The FlyteRemote
class is the entrypoint for programmatically performing operations in a Python
runtime. It can be initialized by passing in the:
Config
object: the parent configuration object that holds all the configuration information to connect to the Flyte backend.default_project
: the default project to use when fetching or executing flyte entities.default_domain
: the default domain to use when fetching or executing flyte entities.file_access
: the file access provider to use for offloading non-literal inputs/outputs.kwargs
: additional arguments that need to be passed to createSynchronousFlyteClient
.
A FlyteRemote
object can be created in various ways:
Auto¶
The Config
class’s auto()
method can be used to automatically
construct the Config
object.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
remote = FlyteRemote(config=Config.auto())
auto
also accepts a config_file
argument, which is the path to the configuration file to use.
The order of precedence that auto
follows is:
Finds all the environment variables that match the configuration variables.
If no environment variables are set, it looks for a configuration file at the path specified by the
config_file
argument.If no configuration file is found, it uses the default values.
Sandbox¶
The Config
class’s for_sandbox()
method can be used to
construct the Config
object, specifically to connect to the Flyte cluster.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
remote = FlyteRemote(config=Config.for_sandbox())
The initialization is as simple as calling for_sandbox()
on the Config
class!
This, by default, uses localhost:30081
as the endpoint, and the default minio credentials.
If the sandbox is in a hosted-like environment, then port-forward or ingress URLs need to be taken care of.
Any Endpoint¶
The Config
class’s for_endpoint()
method can be used to
construct the Config
object to connect to a specific endpoint.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="flyte.example.net"),
default_project="flytesnacks",
default_domain="development",
)
The for_endpoint
method also accepts:
insecure
: whether to use insecure connections. Defaults toFalse
.data_config
: can be used to configure how data is downloaded or uploaded to a specific blob storage like S3, GCS, etc.config_file
: the path to the configuration file to use.
Generalized Initialization¶
The Config
class can be directly used to construct the Config
object if additional configuration is needed.
You can send PlatformConfig
, DataConfig
,
SecretsConfig
, and StatsConfig
objects to the Config
class.
|
Settings to talk to a Flyte backend. |
|
Any data storage specific configuration. |
|
Configuration for secrets. |
|
Configuration for sending statsd. |
For example:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, PlatformConfig
remote = FlyteRemote(
config=Config(
platform=PlatformConfig(
endpoint="flyte.example.net",
insecure=False,
client_id="my-client-id",
client_credentials_secret="my-client-secret",
auth_mode="client_credentials",
),
secrets=SecretsConfig(default_dir="/etc/secrets"),
)
)
Fetching Entities¶
Tasks, workflows, launch plans, and executions can be fetched using FlyteRemote.
flyte_task = remote.fetch_task(name="my_task", version="v1")
flyte_workflow = remote.fetch_workflow(name="my_workflow", version="v1")
flyte_launch_plan = remote.fetch_launch_plan(name="my_launch_plan", version="v1")
flyte_execution = remote.fetch_execution(name="my_execution")
project
and domain
can also be specified in all the fetch_*
calls.
If not specified, the default values given during the creation of the FlyteRemote object will be used.
The following is an example that fetches workflow()
:
from flytekit import workflow
task_1 = remote.fetch_task(name="core.basic.hello_world.say_hello", version="v1")
task_2 = remote.fetch_task(
name="core.basic.lp.greet",
version="v13",
project="flytesnacks",
domain="development",
)
@workflow
def my_remote_wf(name: str) -> int:
return task_2(task_1(name=name))
Another example that dynamically creates a launch plan for the my_remote_wf
workflow:
from flytekit import LaunchPlan
flyte_workflow = remote.fetch_workflow(
name="my_workflow", version="v1", project="flytesnacks", domain="development"
)
launch_plan = LaunchPlan.get_or_create(name="my_launch_plan", workflow=flyte_workflow)
Registering Entities¶
Tasks, workflows, and launch plans can be registered using FlyteRemote.
from flytekit.configuration import SerializationSettings
flyte_entity = ...
flyte_task = remote.register_task(
entity=flyte_entity,
serialization_settings=SerializationSettings(image_config=None),
version="v1",
)
flyte_workflow = remote.register_workflow(
entity=flyte_entity,
serialization_settings=SerializationSettings(image_config=None),
version="v1",
)
flyte_launch_plan = remote.register_launch_plan(entity=flyte_entity, version="v1")
entity
: the entity to register.version
: the version that will be used to register. If not specified, the version used in serialization settings will be used.serialization_settings
: the serialization settings to use. Refer toSerializationSettings
to know all the acceptable parameters.
All the additional parameters which can be sent to the register_*
methods can be found in the documentation for the corresponding method:
register_task()
, register_workflow()
,
and register_launch_plan()
.
The SerializationSettings
class accepts ImageConfig
which
holds the available images to use for the registration.
The following example showcases how to register a workflow using an existing image if the workflow is created locally:
from flytekit.configuration import ImageConfig
img = ImageConfig.from_images(
"docker.io/xyz:latest", {"spark": "docker.io/spark:latest"}
)
wf2 = remote.register_workflow(
my_remote_wf,
serialization_settings=SerializationSettings(image_config=img),
version="v1",
)
Executing Entities¶
You can execute a task, workflow, or launch plan using execute()
method
which returns a FlyteWorkflowExecution
object.
For more information on Flyte entities, see the remote flyte entities reference.
flyte_entity = ... # one of FlyteTask, FlyteWorkflow, or FlyteLaunchPlan
execution = remote.execute(
flyte_entity, inputs={...}, execution_name="my_execution", wait=True
)
inputs
: the inputs to the entity.execution_name
: the name of the execution. This is useful to avoid de-duplication of executions.wait
: synchronously wait for the execution to complete.
Additional arguments include:
project
: the project on which to execute the entity.domain
: the domain on which to execute the entity.type_hints
: a dictionary mapping Python types to their corresponding Flyte types.options
: options can be configured for a launch plan during registration or overridden during execution. Refer toOptions
to know all the acceptable parameters.
The following is an example demonstrating how to use the Options
class to configure a Flyte entity:
from flytekit.models.common import AuthRole, Labels
from flytekit.tools.translator import Options
flyte_entity = ... # one of FlyteTask, FlyteWorkflow, or FlyteLaunchPlan
execution = remote.execute(
flyte_entity,
inputs={...},
execution_name="my_execution",
wait=True,
options=Options(
raw_data_prefix="s3://my-bucket/my-prefix",
auth_role=AuthRole(assumable_iam_role="my-role"),
labels=Labels({"my-label": "my-value"}),
),
)
Retrieving & Inspecting Executions¶
After an execution is completed, you can retrieve the execution using the fetch_execution()
method.
The fetched execution can be used to retrieve the inputs and outputs of an execution.
execution = remote.fetch_execution(
name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development"
)
input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()
The inputs
and outputs
correspond to the top-level execution or the workflow itself.
To fetch a specific output, say, a model file:
model_file = execution.outputs["model_file"]
with open(model_file) as f:
# use mode
...
You can use sync()
to sync the entity object’s state with the remote state during the execution run:
synced_execution = remote.sync(execution, sync_nodes=True)
node_keys = synced_execution.node_executions.keys()
Note
During the sync, you may come across Received message larger than max (xxx vs. 4194304)
error if the message size is too large. In that case, edit the flyte-admin-base-config
config map using the command kubectl edit cm flyte-admin-base-config -n flyte
to increase the maxMessageSizeBytes
value. Refer to the troubleshooting guide in case you’ve queries about the command’s usage.
node_executions
will fetch all the underlying node executions recursively.
To fetch output of a specific node execution:
node_execution_output = synced_execution.node_executions["n1"].outputs["model_file"]
Node here, can correspond to a task, workflow, or branch node.
Listing Entities¶
To list the recent executions, use the recent_executions()
method.
recent_executions = remote.recent_executions(project="flytesnacks", domain="development", limit=10)
The limit
parameter is optional and defaults to 100.
To list tasks by version, use the list_tasks_by_version()
method.
tasks = remote.list_tasks_by_version(project="flytesnacks", domain="development", version="v1")
Terminating an Execution¶
To terminate an execution, use the terminate()
method.
execution = remote.fetch_execution(name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development")
remote.terminate(execution, cause="Code needs to be updated")