Shell Tasks#

Tags: Basic

To execute bash scripts within Flyte, you can utilize the ShellTask class. This example includes three shell tasks to execute bash commands.

First, import the necessary libraries.

import os
from typing import Tuple

import flytekit
from flytekit import kwtypes, task, workflow
from import OutputLocation, ShellTask
from import FlyteDirectory
from flytekit.types.file import FlyteFile

With the required imports in place, you can proceed to define a shell task. To create a shell task, provide a name for it, specify the bash script to be executed, and define inputs and outputs if needed.

t1 = ShellTask(
    set -ex
    echo "Hey there! Let's run some bash scripts using Flyte's ShellTask."
    echo "Showcasing Flyte's Shell Task." >> {inputs.x}
    if grep "Flyte" {inputs.x}
        echo "Found it!" >> {inputs.x}
        echo "Not found!"
    output_locs=[OutputLocation(var="i", var_type=FlyteFile, location="{inputs.x}")],

t2 = ShellTask(
    set -ex
    cp {inputs.x} {inputs.y}
    tar -zcvf {outputs.j} {inputs.y}
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory),
    output_locs=[OutputLocation(var="j", var_type=FlyteFile, location="{inputs.y}.tar.gz")],

t3 = ShellTask(
    set -ex
    tar -zxvf {inputs.z}
    cat {inputs.y}/$(basename {inputs.x}) | wc -m > {outputs.k}
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory, z=FlyteFile),
    output_locs=[OutputLocation(var="k", var_type=FlyteFile, location="output.txt")],

Hereโ€™s a breakdown of the parameters of the ShellTask:

  • The inputs parameter allows you to specify the types of inputs that the task will accept

  • The output_locs parameter is used to define the output locations, which can be FlyteFile or FlyteDirectory

  • The script parameter contains the actual bash script that will be executed ({inputs.x}, {outputs.j}, etc. will be replaced with the actual input and output values).

  • The debug parameter is helpful for debugging purposes

We define a task to instantiate FlyteFile and FlyteDirectory. A .gitkeep file is created in the FlyteDirectory as a placeholder to ensure the directory exists.

def create_entities() -> Tuple[FlyteFile, FlyteDirectory]:
    working_dir = flytekit.current_context().working_directory
    flytefile = os.path.join(working_dir, "test.txt"), os.O_CREAT)
    flytedir = os.path.join(working_dir, "testdata")
    os.makedirs(flytedir, exist_ok=True)
    flytedir_file = os.path.join(flytedir, ".gitkeep"), os.O_CREAT)
    return flytefile, flytedir

We create a workflow to define the dependencies between the tasks.

def shell_task_wf() -> FlyteFile:
    x, y = create_entities()
    t1_out = t1(x=x)
    t2_out = t2(x=t1_out, y=y)
    t3_out = t3(x=x, y=y, z=t2_out)
    return t3_out

You can run the workflow locally.

if __name__ == "__main__":
    print(f"Running shell_task_wf() {shell_task_wf()}")