- class flytekit.LaunchPlan(name, workflow, parameters, fixed_inputs, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, trigger=None, overwrite_cache=None, auto_activate=False)[source]#
Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the core concepts if you are unfamiliar with them.
Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow
@workflow def wf(a: int, c: str) -> str: ...
Create the default launch plan with
If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:
launch_plan.LaunchPlan.get_or_create( workflow=wf, name="your_lp_name_1", default_inputs={"a": 3}, fixed_inputs={"c": "4"} )
Additionally, a launch plan can be configured to run on a schedule and emit notifications.
Please see the relevant Schedule and Notification objects as well.
To configure the remaining parameters, you’ll need to import the relevant model objects as well.
sched = CronSchedule(schedule="*/1 * * * *", kickoff_time_input_arg="abc") email_notif = notification.Email( phases=[_execution_model.WorkflowExecutionPhase.SUCCEEDED], recipients_email=[""] ) launch_plan.LaunchPlan.get_or_create( workflow=wf, name="your_lp_name_2", schedule=sched, notifications=[email_notif] )
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig
Then use as follows
auth_role_model = AuthRole(assumable_iam_role="my:iam:role") launch_plan.LaunchPlan.get_or_create( workflow=wf, name="your_lp_name_3", ) labels_model = Labels({"label": "foo"}) annotations_model = Annotations({"annotate": "bar"}) launch_plan.LaunchPlan.get_or_create( workflow=wf, name="your_lp_name_4", auth_role=auth_role_model, labels=labels_model, annotations=annotations_model, ) raw_output_data_config = RawOutputDataConfig("s3://foo/output") launch_plan.LaunchPlan.get_or_create( workflow=wf, name="your_lp_name_5", raw_output_data_config=raw_output_data_config )
- Parameters:
name (str)
workflow (_annotated_workflow.WorkflowBase)
parameters (_interface_models.ParameterMap)
fixed_inputs (_literal_models.LiteralMap)
schedule (Optional[_schedule_model.Schedule])
notifications (Optional[List[_common_models.Notification]])
labels (Optional[_common_models.Labels])
annotations (Optional[_common_models.Annotations])
raw_output_data_config (Optional[_common_models.RawOutputDataConfig])
max_parallelism (Optional[int])
security_context (Optional[security.SecurityContext])
trigger (Optional[LaunchPlanTriggerBase])
overwrite_cache (Optional[bool])
auto_activate (bool)
- clone_with(name, parameters=None, fixed_inputs=None, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, trigger=None, overwrite_cache=None, auto_activate=False)[source]#
- Parameters:
name (str)
parameters (ParameterMap | None)
fixed_inputs (LiteralMap | None)
schedule (Schedule | None)
notifications (List[Notification] | None)
labels (Labels | None)
annotations (Annotations | None)
raw_output_data_config (RawOutputDataConfig | None)
max_parallelism (int | None)
security_context (SecurityContext | None)
trigger (LaunchPlanTriggerBase | None)
overwrite_cache (bool | None)
auto_activate (bool)
- Return type:
- classmethod create(name, workflow, default_inputs=None, fixed_inputs=None, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, auth_role=None, trigger=None, overwrite_cache=None, auto_activate=False)[source]#
- Parameters:
name (str)
workflow (WorkflowBase)
schedule (Schedule | None)
notifications (List[Notification] | None)
labels (Labels | None)
annotations (Annotations | None)
raw_output_data_config (RawOutputDataConfig | None)
max_parallelism (int | None)
security_context (SecurityContext | None)
auth_role (AuthRole | None)
trigger (LaunchPlanTriggerBase | None)
overwrite_cache (bool | None)
auto_activate (bool)
- Return type:
- static get_default_launch_plan(ctx, workflow)[source]#
Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.
- Parameters:
ctx (FlyteContext) – This is not flytekit.current_context(). This is an internal context object. Users familiar with flytekit should feel free to use this however.
workflow (WorkflowBase) – The workflow to create a launch plan for.
- Return type:
- classmethod get_or_create(workflow, name=None, default_inputs=None, fixed_inputs=None, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, auth_role=None, trigger=None, overwrite_cache=None, auto_activate=False)[source]#
This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the cached version is returned
- Parameters:
security_context (SecurityContext | None) – Security context for the execution
workflow (WorkflowBase) – The Workflow to create a launch plan for.
name (str | None) – If you supply a name, keep it mind it needs to be unique. That is, project, domain, version, and this name form a primary key. If you do not supply a name, this function will assume you want the default launch plan for the given workflow.
default_inputs (Dict[str, Any] | None) – Default inputs, expressed as Python values.
fixed_inputs (Dict[str, Any] | None) – Fixed inputs, expressed as Python values. At call time, these cannot be changed.
schedule (Schedule | None) – Optional schedule to run on.
notifications (List[Notification] | None) – Notifications to send.
labels (Labels | None) – Optional labels to attach to executions created by this launch plan.
annotations (Annotations | None) – Optional annotations to attach to executions created by this launch plan.
raw_output_data_config (RawOutputDataConfig | None) – Optional location of offloaded data for things like S3, etc.
auth_role (AuthRole | None) – Add an auth role if necessary.
max_parallelism (int | None) – Controls the maximum number of tasknodes that can be run in parallel for the entire workflow. This is useful to achieve fairness. Note: MapTasks are regarded as one unit, and parallelism/concurrency of MapTasks is independent from this.
trigger (LaunchPlanTriggerBase | None) – [alpha] This is a new syntax for specifying schedules.
overwrite_cache (bool | None) – If set to True, the execution will always overwrite cache
auto_activate (bool) – If set to True, the launch plan will be activated automatically on registration. Default is False.
- Return type:
- CACHE: Dict[str, LaunchPlan] = {}
- annotations
- fixed_inputs
- interface
- labels
- max_parallelism
- name
- notifications
- overwrite_cache
- parameters
- python_interface
- raw_output_data_config
- saved_inputs
- schedule
- security_context
- should_auto_activate
- trigger
- workflow