flytekit.dynamic

flytekit.dynamic(_task_function=None, task_config=None, cache=False, cache_serialize=False, cache_version='', cache_ignore_input_vars=(), retries=0, interruptible=None, deprecated='', timeout=0, container_image=None, environment=None, requests=None, limits=None, secret_requests=None, *, execution_mode=ExecutionBehavior.DYNAMIC, node_dependency_hints=None, task_resolver=None, docs=None, disable_deck=None, enable_deck=None, pod_template=None, pod_template_name=None, accelerator=None)

Please first see the comments for flytekit.task() and flytekit.workflow(). This dynamic concept is an amalgamation of both and enables the user to pursue some pretty incredible constructs.

In short, a task’s function is run at execution time only, and a workflow function is run at compilation time only (local execution notwithstanding). A dynamic workflow is modeled on the backend as a task, but at execution time, the function body is run to produce a workflow. It is almost as if the decorator changed from @task to @workflow except workflows cannot make use of their inputs like native Python values whereas dynamic workflows can. The resulting workflow is passed back to the Flyte engine and is run as a subworkflow. Simple usage

@dynamic
def my_dynamic_subwf(a: int) -> (typing.List[str], int):
    s = []
    for i in range(a):
        s.append(t1(a=i))
    return s, 5

Note in the code block that we call the Python range operator on the input. This is typically not allowed in a workflow but it is here. You can even express dependencies between tasks.

@dynamic
def my_dynamic_subwf(a: int, b: int) -> int:
    x = t1(a=a)
    return t2(b=b, x=x)

See the cookbook for a longer discussion.

Parameters:
  • _task_function (Optional[Callable[..., FuncOut]])

  • task_config (Optional[T])

  • cache (bool)

  • cache_serialize (bool)

  • cache_version (str)

  • cache_ignore_input_vars (Tuple[str, ...])

  • retries (int)

  • interruptible (Optional[bool])

  • deprecated (str)

  • timeout (Union[datetime.timedelta, int])

  • container_image (Optional[Union[str, ImageSpec]])

  • environment (Optional[Dict[str, str]])

  • requests (Optional[Resources])

  • limits (Optional[Resources])

  • secret_requests (Optional[List[Secret]])

  • execution_mode (PythonFunctionTask.ExecutionBehavior)

  • node_dependency_hints (Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]])

  • task_resolver (Optional[TaskResolverMixin])

  • docs (Optional[Documentation])

  • disable_deck (Optional[bool])

  • enable_deck (Optional[bool])

  • pod_template (Optional['PodTemplate'])

  • pod_template_name (Optional[str])

  • accelerator (Optional[BaseAccelerator])

Return type:

Union[Callable[[Callable[…, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T], Callable[…, FuncOut]]