flytekit.LaunchPlan

class flytekit.LaunchPlan(name, workflow, parameters, fixed_inputs, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, trigger=None, overwrite_cache=None)[source]

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the core concepts if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

@workflow
def wf(a: int, c: str) -> str:
    ...

Create the default launch plan with

LaunchPlan.get_or_create(workflow=my_wf)

If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

launch_plan.LaunchPlan.get_or_create(
    workflow=wf, name="your_lp_name_1", default_inputs={"a": 3}, fixed_inputs={"c": "4"}
)

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

sched = CronSchedule("* * ? * * *", kickoff_time_input_arg="abc")
email_notif = notification.Email(
    phases=[_execution_model.WorkflowExecutionPhase.SUCCEEDED], recipients_email=["my-team@email.com"]
)
launch_plan.LaunchPlan.get_or_create(
    workflow=wf, name="your_lp_name_2", schedule=sched, notifications=[email_notif]
)
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows

auth_role_model = AuthRole(assumable_iam_role="my:iam:role")
launch_plan.LaunchPlan.get_or_create(
    workflow=wf,
    name="your_lp_name_3",
)

labels_model = Labels({"label": "foo"})
annotations_model = Annotations({"annotate": "bar"})
launch_plan.LaunchPlan.get_or_create(
    workflow=wf,
    name="your_lp_name_4",
    auth_role=auth_role_model,
    labels=labels_model,
    annotations=annotations_model,
)

raw_output_data_config = RawOutputDataConfig("s3://foo/output")
launch_plan.LaunchPlan.get_or_create(
    workflow=wf, name="your_lp_name_5", raw_output_data_config=raw_output_data_config
)

Methods

Parameters:
  • name (str)

  • workflow (_annotated_workflow.WorkflowBase)

  • parameters (_interface_models.ParameterMap)

  • fixed_inputs (_literal_models.LiteralMap)

  • schedule (Optional[_schedule_model.Schedule])

  • notifications (Optional[List[_common_models.Notification]])

  • labels (Optional[_common_models.Labels])

  • annotations (Optional[_common_models.Annotations])

  • raw_output_data_config (Optional[_common_models.RawOutputDataConfig])

  • max_parallelism (Optional[int])

  • security_context (Optional[security.SecurityContext])

  • trigger (Optional[LaunchPlanTriggerBase])

  • overwrite_cache (Optional[bool])

clone_with(name, parameters=None, fixed_inputs=None, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, trigger=None, overwrite_cache=None)[source]
Parameters:
  • name (str)

  • parameters (ParameterMap | None)

  • fixed_inputs (LiteralMap | None)

  • schedule (Schedule | None)

  • notifications (List[Notification] | None)

  • labels (Labels | None)

  • annotations (Annotations | None)

  • raw_output_data_config (RawOutputDataConfig | None)

  • max_parallelism (int | None)

  • security_context (SecurityContext | None)

  • trigger (LaunchPlanTriggerBase | None)

  • overwrite_cache (bool | None)

Return type:

LaunchPlan

construct_node_metadata()[source]
Return type:

NodeMetadata

classmethod create(name, workflow, default_inputs=None, fixed_inputs=None, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, auth_role=None, trigger=None, overwrite_cache=None)[source]
Parameters:
  • name (str)

  • workflow (WorkflowBase)

  • default_inputs (Dict[str, Any] | None)

  • fixed_inputs (Dict[str, Any] | None)

  • schedule (Schedule | None)

  • notifications (List[Notification] | None)

  • labels (Labels | None)

  • annotations (Annotations | None)

  • raw_output_data_config (RawOutputDataConfig | None)

  • max_parallelism (int | None)

  • security_context (SecurityContext | None)

  • auth_role (AuthRole | None)

  • trigger (LaunchPlanTriggerBase | None)

  • overwrite_cache (bool | None)

Return type:

LaunchPlan

static get_default_launch_plan(ctx, workflow)[source]

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameters:
  • ctx (FlyteContext) – This is not flytekit.current_context(). This is an internal context object. Users familiar with flytekit should feel free to use this however.

  • workflow (WorkflowBase) – The workflow to create a launch plan for.

Return type:

LaunchPlan

classmethod get_or_create(workflow, name=None, default_inputs=None, fixed_inputs=None, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, auth_role=None, trigger=None, overwrite_cache=None)[source]

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameters:
  • security_context (SecurityContext | None) – Security context for the execution

  • workflow (WorkflowBase) – The Workflow to create a launch plan for.

  • name (str | None) – If you supply a name, keep it mind it needs to be unique. That is, project, domain, version, and this name form a primary key. If you do not supply a name, this function will assume you want the default launch plan for the given workflow.

  • default_inputs (Dict[str, Any] | None) – Default inputs, expressed as Python values.

  • fixed_inputs (Dict[str, Any] | None) – Fixed inputs, expressed as Python values. At call time, these cannot be changed.

  • schedule (Schedule | None) – Optional schedule to run on.

  • notifications (List[Notification] | None) – Notifications to send.

  • labels (Labels | None) – Optional labels to attach to executions created by this launch plan.

  • annotations (Annotations | None) – Optional annotations to attach to executions created by this launch plan.

  • raw_output_data_config (RawOutputDataConfig | None) – Optional location of offloaded data for things like S3, etc.

  • auth_role (AuthRole | None) – Add an auth role if necessary.

  • max_parallelism (int | None) – Controls the maximum number of tasknodes that can be run in parallel for the entire workflow. This is useful to achieve fairness. Note: MapTasks are regarded as one unit, and parallelism/concurrency of MapTasks is independent from this.

  • trigger (LaunchPlanTriggerBase | None) – [alpha] This is a new syntax for specifying schedules.

  • overwrite_cache (bool | None)

Return type:

LaunchPlan

Attributes

CACHE: Dict[str, LaunchPlan] = {}
annotations
fixed_inputs
interface
labels
max_parallelism
name
notifications
overwrite_cache
parameters
python_interface
raw_output_data_config
saved_inputs
schedule
security_context
trigger
workflow