Run Bash Scripts Using ShellTask#

To run bash scripts from within Flyte, ShellTask can be used. In this example, let’s define three ShellTasks to run simple bash commands.

Note

The new input/output placeholder syntax of ShellTask is available starting Flytekit 0.30.0b8+.

import os
from typing import Tuple

import flytekit
from flytekit import kwtypes, task, workflow
from flytekit.extras.tasks.shell import OutputLocation, ShellTask
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile


t1 = ShellTask(
    name="task_1",
    debug=True,
    script="""
    set -ex
    echo "Hey there! Let's run some bash scripts using Flyte's ShellTask."
    echo "Showcasing Flyte's Shell Task." >> {inputs.x}
    if grep "Flyte" {inputs.x}
    then
        echo "Found it!" >> {inputs.x}
    else
        echo "Not found!"
    fi
    """,
    inputs=kwtypes(x=FlyteFile),
    output_locs=[
        OutputLocation(var="i", var_type=FlyteFile, location="{inputs.x}")
    ],
)


t2 = ShellTask(
    name="task_2",
    debug=True,
    script="""
    set -ex
    cp {inputs.x} {inputs.y}
    tar -zcvf {outputs.j} {inputs.y}
    """,
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory),
    output_locs=[
        OutputLocation(var="j", var_type=FlyteFile, location="{inputs.y}.tar.gz")
    ],
)


t3 = ShellTask(
    name="task_3",
    debug=True,
    script="""
    set -ex
    tar -zxvf {inputs.z}
    cat {inputs.y}/$(basename {inputs.x}) | wc -m > {outputs.k}
    """,
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory, z=FlyteFile),
    output_locs=[OutputLocation(var="k", var_type=FlyteFile, location="output.txt")],
)
  • The inputs parameter is useful to specify the types of inputs that the task will accept

  • The output_locs parameter is helpful to specify the output locations, could be a FlyteFile or FlyteDirectory

  • The script parameter is the actual bash script that will be executed ({inputs.x}, {outputs.j}, etc. will be replaced with the actual input and output values)

  • The debug parameter is useful for debugging

Next, we define a task to create FlyteFile and FlyteDirectory.

@task
def create_entities() -> Tuple[FlyteFile, FlyteDirectory]:
    working_dir = flytekit.current_context().working_directory
    flytefile = os.path.join(working_dir, "test.txt")
    os.open(flytefile, os.O_CREAT)
    flytedir = os.path.join(working_dir, "testdata")
    os.makedirs(flytedir, exist_ok=True)
    return flytefile, flytedir

The data passage between tasks can be witnessed in the following workflow:

@workflow
def wf() -> FlyteFile:
    x, y = create_entities()
    t1_out = t1(x=x)
    t2_out = t2(x=t1_out, y=y)
    t3_out = t3(x=x, y=y, z=t2_out)
    return t3_out


if __name__ == "__main__":
    print(f"Running wf() {wf()}")

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery