dynamic(_task_function: Optional[Callable] = None, task_config: Optional[Any] = None, cache: bool = False, cache_version: str = '', retries: int = 0, interruptable: bool = False, deprecated: str = '', timeout: Union[datetime.timedelta, int] = 0, container_image: Optional[str] = None, environment: Optional[Dict[str, str]] = None, requests: Optional[flytekit.core.resources.Resources] = None, limits: Optional[flytekit.core.resources.Resources] = None, secret_requests: Optional[List[flytekit.models.security.Secret]] = None, *, execution_mode: Optional[flytekit.core.python_function_task.PythonFunctionTask.ExecutionBehavior] = <ExecutionBehavior.DYNAMIC: 2>) → Union[Callable, flytekit.core.python_function_task.PythonFunctionTask]¶
In short, a task’s function is run at execution time only, and a workflow function is run at compilation time only (local execution notwithstanding). A dynamic workflow is modeled on the backend as a task, but at execution time, the function body is run to produce a workflow. It is almost as if the decorator changed from
@workflowexcept workflows cannot make use of their inputs like native Python values whereas dynamic workflows can. The resulting workflow is passed back to the Flyte engine and is run as a subworkflow. Simple usage
@dynamic def my_dynamic_subwf(a: int) -> (typing.List[str], int): s =  for i in range(a): s.append(t1(a=i)) return s, 5
Note in the code block that we call the Python
rangeoperator on the input. This is typically not allowed in a workflow but it is here. You can even express dependencies between tasks.
@dynamic def my_dynamic_subwf(a: int, b: int) -> int: x = t1(a=a) return t2(b=b, x=x)
See the cookbook for a longer discussion.